sender.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package statsd
  2. import (
  3. "io"
  4. "sync/atomic"
  5. )
  6. // senderTelemetry contains telemetry about the health of the sender
  7. type senderTelemetry struct {
  8. totalPayloadsSent uint64
  9. totalPayloadsDroppedQueueFull uint64
  10. totalPayloadsDroppedWriter uint64
  11. totalBytesSent uint64
  12. totalBytesDroppedQueueFull uint64
  13. totalBytesDroppedWriter uint64
  14. }
  15. type sender struct {
  16. transport io.WriteCloser
  17. pool *bufferPool
  18. queue chan *statsdBuffer
  19. telemetry *senderTelemetry
  20. stop chan struct{}
  21. flushSignal chan struct{}
  22. }
  23. func newSender(transport io.WriteCloser, queueSize int, pool *bufferPool) *sender {
  24. sender := &sender{
  25. transport: transport,
  26. pool: pool,
  27. queue: make(chan *statsdBuffer, queueSize),
  28. telemetry: &senderTelemetry{},
  29. stop: make(chan struct{}),
  30. flushSignal: make(chan struct{}),
  31. }
  32. go sender.sendLoop()
  33. return sender
  34. }
  35. func (s *sender) send(buffer *statsdBuffer) {
  36. select {
  37. case s.queue <- buffer:
  38. default:
  39. atomic.AddUint64(&s.telemetry.totalPayloadsDroppedQueueFull, 1)
  40. atomic.AddUint64(&s.telemetry.totalBytesDroppedQueueFull, uint64(len(buffer.bytes())))
  41. s.pool.returnBuffer(buffer)
  42. }
  43. }
  44. func (s *sender) write(buffer *statsdBuffer) {
  45. _, err := s.transport.Write(buffer.bytes())
  46. if err != nil {
  47. atomic.AddUint64(&s.telemetry.totalPayloadsDroppedWriter, 1)
  48. atomic.AddUint64(&s.telemetry.totalBytesDroppedWriter, uint64(len(buffer.bytes())))
  49. } else {
  50. atomic.AddUint64(&s.telemetry.totalPayloadsSent, 1)
  51. atomic.AddUint64(&s.telemetry.totalBytesSent, uint64(len(buffer.bytes())))
  52. }
  53. s.pool.returnBuffer(buffer)
  54. }
  55. func (s *sender) flushTelemetryMetrics(t *Telemetry) {
  56. t.TotalPayloadsSent = atomic.LoadUint64(&s.telemetry.totalPayloadsSent)
  57. t.TotalPayloadsDroppedQueueFull = atomic.LoadUint64(&s.telemetry.totalPayloadsDroppedQueueFull)
  58. t.TotalPayloadsDroppedWriter = atomic.LoadUint64(&s.telemetry.totalPayloadsDroppedWriter)
  59. t.TotalBytesSent = atomic.LoadUint64(&s.telemetry.totalBytesSent)
  60. t.TotalBytesDroppedQueueFull = atomic.LoadUint64(&s.telemetry.totalBytesDroppedQueueFull)
  61. t.TotalBytesDroppedWriter = atomic.LoadUint64(&s.telemetry.totalBytesDroppedWriter)
  62. }
  63. func (s *sender) sendLoop() {
  64. defer close(s.stop)
  65. for {
  66. select {
  67. case buffer := <-s.queue:
  68. s.write(buffer)
  69. case <-s.stop:
  70. return
  71. case <-s.flushSignal:
  72. // At that point we know that the workers are paused (the statsd client
  73. // will pause them before calling sender.flush()).
  74. // So we can fully flush the input queue
  75. s.flushInputQueue()
  76. s.flushSignal <- struct{}{}
  77. }
  78. }
  79. }
  80. func (s *sender) flushInputQueue() {
  81. for {
  82. select {
  83. case buffer := <-s.queue:
  84. s.write(buffer)
  85. default:
  86. return
  87. }
  88. }
  89. }
  90. func (s *sender) flush() {
  91. s.flushSignal <- struct{}{}
  92. <-s.flushSignal
  93. }
  94. func (s *sender) close() error {
  95. s.stop <- struct{}{}
  96. <-s.stop
  97. s.flushInputQueue()
  98. return s.transport.Close()
  99. }