| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- /*
- * Copyright (c) 2019 by Farsight Security, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package dnstap
- import (
- "net"
- "sync"
- "time"
- framestream "github.com/farsightsec/golang-framestream"
- )
- // A SocketWriter writes data to a Frame Streams TCP or Unix domain socket,
- // establishing or restarting the connection if needed.
- type socketWriter struct {
- w Writer
- c net.Conn
- addr net.Addr
- opt SocketWriterOptions
- }
- // SocketWriterOptions provides configuration options for a SocketWriter
- type SocketWriterOptions struct {
- // Timeout gives the time the SocketWriter will wait for reads and
- // writes to complete.
- Timeout time.Duration
- // FlushTimeout is the maximum duration data will be buffered while
- // being written to the socket.
- FlushTimeout time.Duration
- // RetryInterval is how long the SocketWriter will wait between
- // connection attempts.
- RetryInterval time.Duration
- // Dialer is the dialer used to establish the connection. If nil,
- // SocketWriter will use a default dialer with a 30 second timeout.
- Dialer *net.Dialer
- // Logger provides the logger for connection establishment, reconnection,
- // and error events of the SocketWriter.
- Logger Logger
- }
- type flushWriter struct {
- m sync.Mutex
- w *framestream.Writer
- d time.Duration
- timer *time.Timer
- timerActive bool
- lastFlushed time.Time
- stopped bool
- }
- type flusherConn struct {
- net.Conn
- lastWritten *time.Time
- }
- func (c *flusherConn) Write(p []byte) (int, error) {
- n, err := c.Conn.Write(p)
- *c.lastWritten = time.Now()
- return n, err
- }
- func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) {
- var err error
- fw := &flushWriter{timer: time.NewTimer(d), d: d}
- if !fw.timer.Stop() {
- <-fw.timer.C
- }
- fc := &flusherConn{
- Conn: c,
- lastWritten: &fw.lastFlushed,
- }
- fw.w, err = framestream.NewWriter(fc,
- &framestream.WriterOptions{
- ContentTypes: [][]byte{FSContentType},
- Bidirectional: true,
- Timeout: d,
- })
- if err != nil {
- return nil, err
- }
- go fw.runFlusher()
- return fw, nil
- }
- func (fw *flushWriter) runFlusher() {
- for range fw.timer.C {
- fw.m.Lock()
- if fw.stopped {
- fw.m.Unlock()
- return
- }
- last := fw.lastFlushed
- elapsed := time.Since(last)
- if elapsed < fw.d {
- fw.timer.Reset(fw.d - elapsed)
- fw.m.Unlock()
- continue
- }
- fw.w.Flush()
- fw.timerActive = false
- fw.m.Unlock()
- }
- }
- func (fw *flushWriter) WriteFrame(p []byte) (int, error) {
- fw.m.Lock()
- n, err := fw.w.WriteFrame(p)
- if !fw.timerActive {
- fw.timer.Reset(fw.d)
- fw.timerActive = true
- }
- fw.m.Unlock()
- return n, err
- }
- func (fw *flushWriter) Close() error {
- fw.m.Lock()
- fw.stopped = true
- fw.timer.Reset(0)
- err := fw.w.Close()
- fw.m.Unlock()
- return err
- }
- // NewSocketWriter creates a SocketWriter which writes data to a connection
- // to the given addr. The SocketWriter maintains and re-establishes the
- // connection to this address as needed.
- func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer {
- if opt == nil {
- opt = &SocketWriterOptions{}
- }
- if opt.Logger == nil {
- opt.Logger = &nullLogger{}
- }
- return &socketWriter{addr: addr, opt: *opt}
- }
- func (sw *socketWriter) openWriter() error {
- var err error
- sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String())
- if err != nil {
- return err
- }
- wopt := WriterOptions{
- Bidirectional: true,
- Timeout: sw.opt.Timeout,
- }
- if sw.opt.FlushTimeout == 0 {
- sw.w, err = NewWriter(sw.c, &wopt)
- } else {
- sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout)
- }
- if err != nil {
- sw.c.Close()
- return err
- }
- return nil
- }
- // Close shuts down the SocketWriter, closing any open connection.
- func (sw *socketWriter) Close() error {
- var err error
- if sw.w != nil {
- err = sw.w.Close()
- if err == nil {
- return sw.c.Close()
- }
- sw.c.Close()
- return err
- }
- if sw.c != nil {
- return sw.c.Close()
- }
- return nil
- }
- // Write writes the data in p as a Dnstap frame to a connection to the
- // SocketWriter's address. Write may block indefinitely while the SocketWriter
- // attempts to establish or re-establish the connection and FrameStream session.
- func (sw *socketWriter) WriteFrame(p []byte) (int, error) {
- for ; ; time.Sleep(sw.opt.RetryInterval) {
- if sw.w == nil {
- if err := sw.openWriter(); err != nil {
- sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err)
- continue
- }
- }
- n, err := sw.w.WriteFrame(p)
- if err != nil {
- sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err)
- sw.Close()
- continue
- }
- return n, nil
- }
- }
|