SocketWriter.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. /*
  2. * Copyright (c) 2019 by Farsight Security, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package dnstap
  17. import (
  18. "net"
  19. "sync"
  20. "time"
  21. framestream "github.com/farsightsec/golang-framestream"
  22. )
  23. // A SocketWriter writes data to a Frame Streams TCP or Unix domain socket,
  24. // establishing or restarting the connection if needed.
  25. type socketWriter struct {
  26. w Writer
  27. c net.Conn
  28. addr net.Addr
  29. opt SocketWriterOptions
  30. }
  31. // SocketWriterOptions provides configuration options for a SocketWriter
  32. type SocketWriterOptions struct {
  33. // Timeout gives the time the SocketWriter will wait for reads and
  34. // writes to complete.
  35. Timeout time.Duration
  36. // FlushTimeout is the maximum duration data will be buffered while
  37. // being written to the socket.
  38. FlushTimeout time.Duration
  39. // RetryInterval is how long the SocketWriter will wait between
  40. // connection attempts.
  41. RetryInterval time.Duration
  42. // Dialer is the dialer used to establish the connection. If nil,
  43. // SocketWriter will use a default dialer with a 30 second timeout.
  44. Dialer *net.Dialer
  45. // Logger provides the logger for connection establishment, reconnection,
  46. // and error events of the SocketWriter.
  47. Logger Logger
  48. }
  49. type flushWriter struct {
  50. m sync.Mutex
  51. w *framestream.Writer
  52. d time.Duration
  53. timer *time.Timer
  54. timerActive bool
  55. lastFlushed time.Time
  56. stopped bool
  57. }
  58. type flusherConn struct {
  59. net.Conn
  60. lastWritten *time.Time
  61. }
  62. func (c *flusherConn) Write(p []byte) (int, error) {
  63. n, err := c.Conn.Write(p)
  64. *c.lastWritten = time.Now()
  65. return n, err
  66. }
  67. func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) {
  68. var err error
  69. fw := &flushWriter{timer: time.NewTimer(d), d: d}
  70. if !fw.timer.Stop() {
  71. <-fw.timer.C
  72. }
  73. fc := &flusherConn{
  74. Conn: c,
  75. lastWritten: &fw.lastFlushed,
  76. }
  77. fw.w, err = framestream.NewWriter(fc,
  78. &framestream.WriterOptions{
  79. ContentTypes: [][]byte{FSContentType},
  80. Bidirectional: true,
  81. Timeout: d,
  82. })
  83. if err != nil {
  84. return nil, err
  85. }
  86. go fw.runFlusher()
  87. return fw, nil
  88. }
  89. func (fw *flushWriter) runFlusher() {
  90. for range fw.timer.C {
  91. fw.m.Lock()
  92. if fw.stopped {
  93. fw.m.Unlock()
  94. return
  95. }
  96. last := fw.lastFlushed
  97. elapsed := time.Since(last)
  98. if elapsed < fw.d {
  99. fw.timer.Reset(fw.d - elapsed)
  100. fw.m.Unlock()
  101. continue
  102. }
  103. fw.w.Flush()
  104. fw.timerActive = false
  105. fw.m.Unlock()
  106. }
  107. }
  108. func (fw *flushWriter) WriteFrame(p []byte) (int, error) {
  109. fw.m.Lock()
  110. n, err := fw.w.WriteFrame(p)
  111. if !fw.timerActive {
  112. fw.timer.Reset(fw.d)
  113. fw.timerActive = true
  114. }
  115. fw.m.Unlock()
  116. return n, err
  117. }
  118. func (fw *flushWriter) Close() error {
  119. fw.m.Lock()
  120. fw.stopped = true
  121. fw.timer.Reset(0)
  122. err := fw.w.Close()
  123. fw.m.Unlock()
  124. return err
  125. }
  126. // NewSocketWriter creates a SocketWriter which writes data to a connection
  127. // to the given addr. The SocketWriter maintains and re-establishes the
  128. // connection to this address as needed.
  129. func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer {
  130. if opt == nil {
  131. opt = &SocketWriterOptions{}
  132. }
  133. if opt.Logger == nil {
  134. opt.Logger = &nullLogger{}
  135. }
  136. return &socketWriter{addr: addr, opt: *opt}
  137. }
  138. func (sw *socketWriter) openWriter() error {
  139. var err error
  140. sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String())
  141. if err != nil {
  142. return err
  143. }
  144. wopt := WriterOptions{
  145. Bidirectional: true,
  146. Timeout: sw.opt.Timeout,
  147. }
  148. if sw.opt.FlushTimeout == 0 {
  149. sw.w, err = NewWriter(sw.c, &wopt)
  150. } else {
  151. sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout)
  152. }
  153. if err != nil {
  154. sw.c.Close()
  155. return err
  156. }
  157. return nil
  158. }
  159. // Close shuts down the SocketWriter, closing any open connection.
  160. func (sw *socketWriter) Close() error {
  161. var err error
  162. if sw.w != nil {
  163. err = sw.w.Close()
  164. if err == nil {
  165. return sw.c.Close()
  166. }
  167. sw.c.Close()
  168. return err
  169. }
  170. if sw.c != nil {
  171. return sw.c.Close()
  172. }
  173. return nil
  174. }
  175. // Write writes the data in p as a Dnstap frame to a connection to the
  176. // SocketWriter's address. Write may block indefinitely while the SocketWriter
  177. // attempts to establish or re-establish the connection and FrameStream session.
  178. func (sw *socketWriter) WriteFrame(p []byte) (int, error) {
  179. for ; ; time.Sleep(sw.opt.RetryInterval) {
  180. if sw.w == nil {
  181. if err := sw.openWriter(); err != nil {
  182. sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err)
  183. continue
  184. }
  185. }
  186. n, err := sw.w.WriteFrame(p)
  187. if err != nil {
  188. sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err)
  189. sw.Close()
  190. continue
  191. }
  192. return n, nil
  193. }
  194. }