FrameStreamSockOutput.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. /*
  2. * Copyright (c) 2019 by Farsight Security, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package dnstap
  17. import (
  18. "net"
  19. "time"
  20. )
  21. // A FrameStreamSockOutput manages a socket connection and sends dnstap
  22. // data over a framestream connection on that socket.
  23. type FrameStreamSockOutput struct {
  24. address net.Addr
  25. outputChannel chan []byte
  26. wait chan bool
  27. wopt SocketWriterOptions
  28. }
  29. // NewFrameStreamSockOutput creates a FrameStreamSockOutput manaaging a
  30. // connection to the given address.
  31. func NewFrameStreamSockOutput(address net.Addr) (*FrameStreamSockOutput, error) {
  32. return &FrameStreamSockOutput{
  33. address: address,
  34. outputChannel: make(chan []byte, outputChannelSize),
  35. wait: make(chan bool),
  36. wopt: SocketWriterOptions{
  37. FlushTimeout: 5 * time.Second,
  38. RetryInterval: 10 * time.Second,
  39. Dialer: &net.Dialer{
  40. Timeout: 30 * time.Second,
  41. },
  42. Logger: &nullLogger{},
  43. },
  44. }, nil
  45. }
  46. // SetTimeout sets the write timeout for data and control messages and the
  47. // read timeout for handshake responses on the FrameStreamSockOutput's
  48. // connection. The default timeout is zero, for no timeout.
  49. func (o *FrameStreamSockOutput) SetTimeout(timeout time.Duration) {
  50. o.wopt.Timeout = timeout
  51. }
  52. // SetFlushTimeout sets the maximum time data will be kept in the output
  53. // buffer.
  54. //
  55. // The default flush timeout is five seconds.
  56. func (o *FrameStreamSockOutput) SetFlushTimeout(timeout time.Duration) {
  57. o.wopt.FlushTimeout = timeout
  58. }
  59. // SetRetryInterval specifies how long the FrameStreamSockOutput will wait
  60. // before re-establishing a failed connection. The default retry interval
  61. // is 10 seconds.
  62. func (o *FrameStreamSockOutput) SetRetryInterval(retry time.Duration) {
  63. o.wopt.RetryInterval = retry
  64. }
  65. // SetDialer replaces the default net.Dialer for re-establishing the
  66. // the FrameStreamSockOutput connection. This can be used to set the
  67. // timeout for connection establishment and enable keepalives
  68. // new connections.
  69. //
  70. // FrameStreamSockOutput uses a default dialer with a 30 second
  71. // timeout.
  72. func (o *FrameStreamSockOutput) SetDialer(dialer *net.Dialer) {
  73. o.wopt.Dialer = dialer
  74. }
  75. // SetLogger configures FrameStreamSockOutput to log through the given
  76. // Logger.
  77. func (o *FrameStreamSockOutput) SetLogger(logger Logger) {
  78. o.wopt.Logger = logger
  79. }
  80. // GetOutputChannel returns the channel on which the
  81. // FrameStreamSockOutput accepts data.
  82. //
  83. // GetOutputChannel satisifes the dnstap Output interface.
  84. func (o *FrameStreamSockOutput) GetOutputChannel() chan []byte {
  85. return o.outputChannel
  86. }
  87. // RunOutputLoop reads data from the output channel and sends it over
  88. // a connections to the FrameStreamSockOutput's address, establishing
  89. // the connection as needed.
  90. //
  91. // RunOutputLoop satisifes the dnstap Output interface.
  92. func (o *FrameStreamSockOutput) RunOutputLoop() {
  93. w := NewSocketWriter(o.address, &o.wopt)
  94. for b := range o.outputChannel {
  95. // w is of type *SocketWriter, whose Write implementation
  96. // handles all errors by retrying the connection.
  97. w.WriteFrame(b)
  98. }
  99. w.Close()
  100. close(o.wait)
  101. return
  102. }
  103. // Close shuts down the FrameStreamSockOutput's output channel and returns
  104. // after all pending data has been flushed and the connection has been closed.
  105. //
  106. // Close satisifes the dnstap Output interface
  107. func (o *FrameStreamSockOutput) Close() {
  108. close(o.outputChannel)
  109. <-o.wait
  110. }