| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package statsd
- import (
- "sync/atomic"
- "time"
- )
- // A statsdWriter offers a standard interface regardless of the underlying
- // protocol. For now UDS and UPD writers are available.
- // Attention: the underlying buffer of `data` is reused after a `statsdWriter.Write` call.
- // `statsdWriter.Write` must be synchronous.
- type statsdWriter interface {
- Write(data []byte) (n int, err error)
- SetWriteTimeout(time.Duration) error
- Close() error
- }
- // SenderMetrics contains metrics about the health of the sender
- type SenderMetrics struct {
- TotalSentBytes uint64
- TotalSentPayloads uint64
- TotalDroppedPayloads uint64
- TotalDroppedBytes uint64
- TotalDroppedPayloadsQueueFull uint64
- TotalDroppedBytesQueueFull uint64
- TotalDroppedPayloadsWriter uint64
- TotalDroppedBytesWriter uint64
- }
- type sender struct {
- transport statsdWriter
- pool *bufferPool
- queue chan *statsdBuffer
- metrics *SenderMetrics
- stop chan struct{}
- flushSignal chan struct{}
- }
- func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender {
- sender := &sender{
- transport: transport,
- pool: pool,
- queue: make(chan *statsdBuffer, queueSize),
- metrics: &SenderMetrics{},
- stop: make(chan struct{}),
- flushSignal: make(chan struct{}),
- }
- go sender.sendLoop()
- return sender
- }
- func (s *sender) send(buffer *statsdBuffer) {
- select {
- case s.queue <- buffer:
- default:
- atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
- atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
- atomic.AddUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 1)
- atomic.AddUint64(&s.metrics.TotalDroppedBytesQueueFull, uint64(len(buffer.bytes())))
- s.pool.returnBuffer(buffer)
- }
- }
- func (s *sender) write(buffer *statsdBuffer) {
- _, err := s.transport.Write(buffer.bytes())
- if err != nil {
- atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
- atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
- atomic.AddUint64(&s.metrics.TotalDroppedPayloadsWriter, 1)
- atomic.AddUint64(&s.metrics.TotalDroppedBytesWriter, uint64(len(buffer.bytes())))
- } else {
- atomic.AddUint64(&s.metrics.TotalSentPayloads, 1)
- atomic.AddUint64(&s.metrics.TotalSentBytes, uint64(len(buffer.bytes())))
- }
- s.pool.returnBuffer(buffer)
- }
- func (s *sender) flushTelemetryMetrics() SenderMetrics {
- return SenderMetrics{
- TotalSentBytes: atomic.SwapUint64(&s.metrics.TotalSentBytes, 0),
- TotalSentPayloads: atomic.SwapUint64(&s.metrics.TotalSentPayloads, 0),
- TotalDroppedPayloads: atomic.SwapUint64(&s.metrics.TotalDroppedPayloads, 0),
- TotalDroppedBytes: atomic.SwapUint64(&s.metrics.TotalDroppedBytes, 0),
- TotalDroppedPayloadsQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 0),
- TotalDroppedBytesQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedBytesQueueFull, 0),
- TotalDroppedPayloadsWriter: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsWriter, 0),
- TotalDroppedBytesWriter: atomic.SwapUint64(&s.metrics.TotalDroppedBytesWriter, 0),
- }
- }
- func (s *sender) sendLoop() {
- defer close(s.stop)
- for {
- select {
- case buffer := <-s.queue:
- s.write(buffer)
- case <-s.stop:
- return
- case <-s.flushSignal:
- // At that point we know that the workers are paused (the statsd client
- // will pause them before calling sender.flush()).
- // So we can fully flush the input queue
- s.flushInputQueue()
- s.flushSignal <- struct{}{}
- }
- }
- }
- func (s *sender) flushInputQueue() {
- for {
- select {
- case buffer := <-s.queue:
- s.write(buffer)
- default:
- return
- }
- }
- }
- func (s *sender) flush() {
- s.flushSignal <- struct{}{}
- <-s.flushSignal
- }
- func (s *sender) close() error {
- s.stop <- struct{}{}
- <-s.stop
- s.flushInputQueue()
- return s.transport.Close()
- }
|