FrameStreamInput.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. /*
  2. * Copyright (c) 2013-2019 by Farsight Security, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package dnstap
  17. import (
  18. "io"
  19. "os"
  20. "time"
  21. )
  22. // MaxPayloadSize sets the upper limit on input Dnstap payload sizes. If an Input
  23. // receives a Dnstap payload over this size limit, ReadInto will log an error and
  24. // return.
  25. //
  26. // EDNS0 and DNS over TCP use 2 octets for DNS message size, imposing a maximum
  27. // size of 65535 octets for the DNS message, which is the bulk of the data carried
  28. // in a Dnstap message. Protobuf encoding overhead and metadata with some size
  29. // guidance (e.g., identity and version being DNS strings, which have a maximum
  30. // length of 255) add up to less than 1KB. The default 96KiB size of the buffer
  31. // allows a bit over 30KB space for "extra" metadata.
  32. //
  33. var MaxPayloadSize uint32 = 96 * 1024
  34. // A FrameStreamInput reads dnstap data from an io.ReadWriter.
  35. type FrameStreamInput struct {
  36. wait chan bool
  37. reader Reader
  38. log Logger
  39. }
  40. // NewFrameStreamInput creates a FrameStreamInput reading data from the given
  41. // io.ReadWriter. If bi is true, the input will use the bidirectional
  42. // framestream protocol suitable for TCP and unix domain socket connections.
  43. func NewFrameStreamInput(r io.ReadWriter, bi bool) (input *FrameStreamInput, err error) {
  44. return NewFrameStreamInputTimeout(r, bi, 0)
  45. }
  46. // NewFrameStreamInputTimeout creates a FramestreamInput reading data from the
  47. // given io.ReadWriter with a timeout applied to reading and (for bidirectional
  48. // inputs) writing control messages.
  49. func NewFrameStreamInputTimeout(r io.ReadWriter, bi bool, timeout time.Duration) (input *FrameStreamInput, err error) {
  50. reader, err := NewReader(r, &ReaderOptions{
  51. Bidirectional: bi,
  52. Timeout: timeout,
  53. })
  54. if err != nil {
  55. return nil, err
  56. }
  57. return &FrameStreamInput{
  58. wait: make(chan bool),
  59. reader: reader,
  60. log: nullLogger{},
  61. }, nil
  62. }
  63. // NewFrameStreamInputFromFilename creates a FrameStreamInput reading from
  64. // the named file.
  65. func NewFrameStreamInputFromFilename(fname string) (input *FrameStreamInput, err error) {
  66. file, err := os.Open(fname)
  67. if err != nil {
  68. return nil, err
  69. }
  70. return NewFrameStreamInput(file, false)
  71. }
  72. // SetLogger configures a logger for FrameStreamInput read error reporting.
  73. func (input *FrameStreamInput) SetLogger(logger Logger) {
  74. input.log = logger
  75. }
  76. // ReadInto reads data from the FrameStreamInput into the output channel.
  77. //
  78. // ReadInto satisfies the dnstap Input interface.
  79. func (input *FrameStreamInput) ReadInto(output chan []byte) {
  80. buf := make([]byte, MaxPayloadSize)
  81. for {
  82. n, err := input.reader.ReadFrame(buf)
  83. if err == nil {
  84. newbuf := make([]byte, n)
  85. copy(newbuf, buf)
  86. output <- newbuf
  87. continue
  88. }
  89. if err != io.EOF {
  90. input.log.Printf("FrameStreamInput: Read error: %v", err)
  91. }
  92. break
  93. }
  94. close(input.wait)
  95. }
  96. // Wait reeturns when ReadInto has finished.
  97. //
  98. // Wait satisfies the dnstap Input interface.
  99. func (input *FrameStreamInput) Wait() {
  100. <-input.wait
  101. }