sender.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package statsd
  2. import (
  3. "sync/atomic"
  4. "time"
  5. )
  6. // A statsdWriter offers a standard interface regardless of the underlying
  7. // protocol. For now UDS and UPD writers are available.
  8. // Attention: the underlying buffer of `data` is reused after a `statsdWriter.Write` call.
  9. // `statsdWriter.Write` must be synchronous.
  10. type statsdWriter interface {
  11. Write(data []byte) (n int, err error)
  12. SetWriteTimeout(time.Duration) error
  13. Close() error
  14. }
  15. // SenderMetrics contains metrics about the health of the sender
  16. type SenderMetrics struct {
  17. TotalSentBytes uint64
  18. TotalSentPayloads uint64
  19. TotalDroppedPayloads uint64
  20. TotalDroppedBytes uint64
  21. TotalDroppedPayloadsQueueFull uint64
  22. TotalDroppedBytesQueueFull uint64
  23. TotalDroppedPayloadsWriter uint64
  24. TotalDroppedBytesWriter uint64
  25. }
  26. type sender struct {
  27. transport statsdWriter
  28. pool *bufferPool
  29. queue chan *statsdBuffer
  30. metrics *SenderMetrics
  31. stop chan struct{}
  32. flushSignal chan struct{}
  33. }
  34. func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender {
  35. sender := &sender{
  36. transport: transport,
  37. pool: pool,
  38. queue: make(chan *statsdBuffer, queueSize),
  39. metrics: &SenderMetrics{},
  40. stop: make(chan struct{}),
  41. flushSignal: make(chan struct{}),
  42. }
  43. go sender.sendLoop()
  44. return sender
  45. }
  46. func (s *sender) send(buffer *statsdBuffer) {
  47. select {
  48. case s.queue <- buffer:
  49. default:
  50. atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
  51. atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
  52. atomic.AddUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 1)
  53. atomic.AddUint64(&s.metrics.TotalDroppedBytesQueueFull, uint64(len(buffer.bytes())))
  54. s.pool.returnBuffer(buffer)
  55. }
  56. }
  57. func (s *sender) write(buffer *statsdBuffer) {
  58. _, err := s.transport.Write(buffer.bytes())
  59. if err != nil {
  60. atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
  61. atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
  62. atomic.AddUint64(&s.metrics.TotalDroppedPayloadsWriter, 1)
  63. atomic.AddUint64(&s.metrics.TotalDroppedBytesWriter, uint64(len(buffer.bytes())))
  64. } else {
  65. atomic.AddUint64(&s.metrics.TotalSentPayloads, 1)
  66. atomic.AddUint64(&s.metrics.TotalSentBytes, uint64(len(buffer.bytes())))
  67. }
  68. s.pool.returnBuffer(buffer)
  69. }
  70. func (s *sender) flushTelemetryMetrics() SenderMetrics {
  71. return SenderMetrics{
  72. TotalSentBytes: atomic.SwapUint64(&s.metrics.TotalSentBytes, 0),
  73. TotalSentPayloads: atomic.SwapUint64(&s.metrics.TotalSentPayloads, 0),
  74. TotalDroppedPayloads: atomic.SwapUint64(&s.metrics.TotalDroppedPayloads, 0),
  75. TotalDroppedBytes: atomic.SwapUint64(&s.metrics.TotalDroppedBytes, 0),
  76. TotalDroppedPayloadsQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 0),
  77. TotalDroppedBytesQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedBytesQueueFull, 0),
  78. TotalDroppedPayloadsWriter: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsWriter, 0),
  79. TotalDroppedBytesWriter: atomic.SwapUint64(&s.metrics.TotalDroppedBytesWriter, 0),
  80. }
  81. }
  82. func (s *sender) sendLoop() {
  83. defer close(s.stop)
  84. for {
  85. select {
  86. case buffer := <-s.queue:
  87. s.write(buffer)
  88. case <-s.stop:
  89. return
  90. case <-s.flushSignal:
  91. // At that point we know that the workers are paused (the statsd client
  92. // will pause them before calling sender.flush()).
  93. // So we can fully flush the input queue
  94. s.flushInputQueue()
  95. s.flushSignal <- struct{}{}
  96. }
  97. }
  98. }
  99. func (s *sender) flushInputQueue() {
  100. for {
  101. select {
  102. case buffer := <-s.queue:
  103. s.write(buffer)
  104. default:
  105. return
  106. }
  107. }
  108. }
  109. func (s *sender) flush() {
  110. s.flushSignal <- struct{}{}
  111. <-s.flushSignal
  112. }
  113. func (s *sender) close() error {
  114. s.stop <- struct{}{}
  115. <-s.stop
  116. s.flushInputQueue()
  117. return s.transport.Close()
  118. }