worker.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package statsd
  2. import (
  3. "math/rand"
  4. "sync"
  5. "time"
  6. )
  7. type worker struct {
  8. pool *bufferPool
  9. buffer *statsdBuffer
  10. sender *sender
  11. random *rand.Rand
  12. randomLock sync.Mutex
  13. sync.Mutex
  14. inputMetrics chan metric
  15. stop chan struct{}
  16. }
  17. func newWorker(pool *bufferPool, sender *sender) *worker {
  18. // Each worker uses its own random source and random lock to prevent
  19. // workers in separate goroutines from contending for the lock on the
  20. // "math/rand" package-global random source (e.g. calls like
  21. // "rand.Float64()" must acquire a shared lock to get the next
  22. // pseudorandom number).
  23. // Note that calling "time.Now().UnixNano()" repeatedly quickly may return
  24. // very similar values. That's fine for seeding the worker-specific random
  25. // source because we just need an evenly distributed stream of float values.
  26. // Do not use this random source for cryptographic randomness.
  27. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  28. return &worker{
  29. pool: pool,
  30. sender: sender,
  31. buffer: pool.borrowBuffer(),
  32. random: random,
  33. stop: make(chan struct{}),
  34. }
  35. }
  36. func (w *worker) startReceivingMetric(bufferSize int) {
  37. w.inputMetrics = make(chan metric, bufferSize)
  38. go w.pullMetric()
  39. }
  40. func (w *worker) stopReceivingMetric() {
  41. w.stop <- struct{}{}
  42. }
  43. func (w *worker) pullMetric() {
  44. for {
  45. select {
  46. case m := <-w.inputMetrics:
  47. w.processMetric(m)
  48. case <-w.stop:
  49. return
  50. }
  51. }
  52. }
  53. func (w *worker) processMetric(m metric) error {
  54. if !shouldSample(m.rate, w.random, &w.randomLock) {
  55. return nil
  56. }
  57. w.Lock()
  58. var err error
  59. if err = w.writeMetricUnsafe(m); err == errBufferFull {
  60. w.flushUnsafe()
  61. err = w.writeMetricUnsafe(m)
  62. }
  63. w.Unlock()
  64. return err
  65. }
  66. func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, precision int) error {
  67. globalPos := 0
  68. // first check how much data we can write to the buffer:
  69. // +3 + len(metricSymbol) because the message will include '|<metricSymbol>|#' before the tags
  70. // +1 for the potential line break at the start of the metric
  71. tagsSize := len(m.stags) + 4 + len(metricSymbol)
  72. for _, t := range m.globalTags {
  73. tagsSize += len(t) + 1
  74. }
  75. for {
  76. pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, tagsSize, precision)
  77. if err == errPartialWrite {
  78. // We successfully wrote part of the histogram metrics.
  79. // We flush the current buffer and finish the histogram
  80. // in a new one.
  81. w.flushUnsafe()
  82. globalPos += pos
  83. } else {
  84. return err
  85. }
  86. }
  87. }
  88. func (w *worker) writeMetricUnsafe(m metric) error {
  89. switch m.metricType {
  90. case gauge:
  91. return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
  92. case count:
  93. return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
  94. case histogram:
  95. return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
  96. case distribution:
  97. return w.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
  98. case set:
  99. return w.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate)
  100. case timing:
  101. return w.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
  102. case event:
  103. return w.buffer.writeEvent(*m.evalue, m.globalTags)
  104. case serviceCheck:
  105. return w.buffer.writeServiceCheck(*m.scvalue, m.globalTags)
  106. case histogramAggregated:
  107. return w.writeAggregatedMetricUnsafe(m, histogramSymbol, -1)
  108. case distributionAggregated:
  109. return w.writeAggregatedMetricUnsafe(m, distributionSymbol, -1)
  110. case timingAggregated:
  111. return w.writeAggregatedMetricUnsafe(m, timingSymbol, 6)
  112. default:
  113. return nil
  114. }
  115. }
  116. func (w *worker) flush() {
  117. w.Lock()
  118. w.flushUnsafe()
  119. w.Unlock()
  120. }
  121. func (w *worker) pause() {
  122. w.Lock()
  123. }
  124. func (w *worker) unpause() {
  125. w.Unlock()
  126. }
  127. // flush the current buffer. Lock must be held by caller.
  128. // flushed buffer written to the network asynchronously.
  129. func (w *worker) flushUnsafe() {
  130. if len(w.buffer.bytes()) > 0 {
  131. w.sender.send(w.buffer)
  132. w.buffer = w.pool.borrowBuffer()
  133. }
  134. }