aggregator.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package statsd
  2. import (
  3. "strings"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type (
  9. countsMap map[string]*countMetric
  10. gaugesMap map[string]*gaugeMetric
  11. setsMap map[string]*setMetric
  12. bufferedMetricMap map[string]*bufferedMetric
  13. )
  14. type aggregator struct {
  15. nbContextGauge uint64
  16. nbContextCount uint64
  17. nbContextSet uint64
  18. countsM sync.RWMutex
  19. gaugesM sync.RWMutex
  20. setsM sync.RWMutex
  21. gauges gaugesMap
  22. counts countsMap
  23. sets setsMap
  24. histograms bufferedMetricContexts
  25. distributions bufferedMetricContexts
  26. timings bufferedMetricContexts
  27. closed chan struct{}
  28. client *Client
  29. // aggregator implements channelMode mechanism to receive histograms,
  30. // distributions and timings. Since they need sampling they need to
  31. // lock for random. When using both channelMode and ExtendedAggregation
  32. // we don't want goroutine to fight over the lock.
  33. inputMetrics chan metric
  34. stopChannelMode chan struct{}
  35. wg sync.WaitGroup
  36. }
  37. func newAggregator(c *Client) *aggregator {
  38. return &aggregator{
  39. client: c,
  40. counts: countsMap{},
  41. gauges: gaugesMap{},
  42. sets: setsMap{},
  43. histograms: newBufferedContexts(newHistogramMetric),
  44. distributions: newBufferedContexts(newDistributionMetric),
  45. timings: newBufferedContexts(newTimingMetric),
  46. closed: make(chan struct{}),
  47. stopChannelMode: make(chan struct{}),
  48. }
  49. }
  50. func (a *aggregator) start(flushInterval time.Duration) {
  51. ticker := time.NewTicker(flushInterval)
  52. go func() {
  53. for {
  54. select {
  55. case <-ticker.C:
  56. a.flush()
  57. case <-a.closed:
  58. return
  59. }
  60. }
  61. }()
  62. }
  63. func (a *aggregator) startReceivingMetric(bufferSize int, nbWorkers int) {
  64. a.inputMetrics = make(chan metric, bufferSize)
  65. for i := 0; i < nbWorkers; i++ {
  66. a.wg.Add(1)
  67. go a.pullMetric()
  68. }
  69. }
  70. func (a *aggregator) stopReceivingMetric() {
  71. close(a.stopChannelMode)
  72. a.wg.Wait()
  73. }
  74. func (a *aggregator) stop() {
  75. a.closed <- struct{}{}
  76. }
  77. func (a *aggregator) pullMetric() {
  78. for {
  79. select {
  80. case m := <-a.inputMetrics:
  81. switch m.metricType {
  82. case histogram:
  83. a.histogram(m.name, m.fvalue, m.tags, m.rate)
  84. case distribution:
  85. a.distribution(m.name, m.fvalue, m.tags, m.rate)
  86. case timing:
  87. a.timing(m.name, m.fvalue, m.tags, m.rate)
  88. }
  89. case <-a.stopChannelMode:
  90. a.wg.Done()
  91. return
  92. }
  93. }
  94. }
  95. func (a *aggregator) flush() {
  96. for _, m := range a.flushMetrics() {
  97. a.client.sendBlocking(m)
  98. }
  99. }
  100. func (a *aggregator) flushTelemetryMetrics(t *Telemetry) {
  101. if a == nil {
  102. // aggregation is disabled
  103. return
  104. }
  105. t.AggregationNbContextGauge = atomic.LoadUint64(&a.nbContextGauge)
  106. t.AggregationNbContextCount = atomic.LoadUint64(&a.nbContextCount)
  107. t.AggregationNbContextSet = atomic.LoadUint64(&a.nbContextSet)
  108. t.AggregationNbContextHistogram = a.histograms.getNbContext()
  109. t.AggregationNbContextDistribution = a.distributions.getNbContext()
  110. t.AggregationNbContextTiming = a.timings.getNbContext()
  111. }
  112. func (a *aggregator) flushMetrics() []metric {
  113. metrics := []metric{}
  114. // We reset the values to avoid sending 'zero' values for metrics not
  115. // sampled during this flush interval
  116. a.setsM.Lock()
  117. sets := a.sets
  118. a.sets = setsMap{}
  119. a.setsM.Unlock()
  120. for _, s := range sets {
  121. metrics = append(metrics, s.flushUnsafe()...)
  122. }
  123. a.gaugesM.Lock()
  124. gauges := a.gauges
  125. a.gauges = gaugesMap{}
  126. a.gaugesM.Unlock()
  127. for _, g := range gauges {
  128. metrics = append(metrics, g.flushUnsafe())
  129. }
  130. a.countsM.Lock()
  131. counts := a.counts
  132. a.counts = countsMap{}
  133. a.countsM.Unlock()
  134. for _, c := range counts {
  135. metrics = append(metrics, c.flushUnsafe())
  136. }
  137. metrics = a.histograms.flush(metrics)
  138. metrics = a.distributions.flush(metrics)
  139. metrics = a.timings.flush(metrics)
  140. atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
  141. atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
  142. atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
  143. return metrics
  144. }
  145. func getContext(name string, tags []string) string {
  146. return name + ":" + strings.Join(tags, tagSeparatorSymbol)
  147. }
  148. func getContextAndTags(name string, tags []string) (string, string) {
  149. stringTags := strings.Join(tags, tagSeparatorSymbol)
  150. return name + ":" + stringTags, stringTags
  151. }
  152. func (a *aggregator) count(name string, value int64, tags []string) error {
  153. context := getContext(name, tags)
  154. a.countsM.RLock()
  155. if count, found := a.counts[context]; found {
  156. count.sample(value)
  157. a.countsM.RUnlock()
  158. return nil
  159. }
  160. a.countsM.RUnlock()
  161. a.countsM.Lock()
  162. // Check if another goroutines hasn't created the value betwen the RUnlock and 'Lock'
  163. if count, found := a.counts[context]; found {
  164. count.sample(value)
  165. a.countsM.Unlock()
  166. return nil
  167. }
  168. a.counts[context] = newCountMetric(name, value, tags)
  169. a.countsM.Unlock()
  170. return nil
  171. }
  172. func (a *aggregator) gauge(name string, value float64, tags []string) error {
  173. context := getContext(name, tags)
  174. a.gaugesM.RLock()
  175. if gauge, found := a.gauges[context]; found {
  176. gauge.sample(value)
  177. a.gaugesM.RUnlock()
  178. return nil
  179. }
  180. a.gaugesM.RUnlock()
  181. gauge := newGaugeMetric(name, value, tags)
  182. a.gaugesM.Lock()
  183. // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
  184. if gauge, found := a.gauges[context]; found {
  185. gauge.sample(value)
  186. a.gaugesM.Unlock()
  187. return nil
  188. }
  189. a.gauges[context] = gauge
  190. a.gaugesM.Unlock()
  191. return nil
  192. }
  193. func (a *aggregator) set(name string, value string, tags []string) error {
  194. context := getContext(name, tags)
  195. a.setsM.RLock()
  196. if set, found := a.sets[context]; found {
  197. set.sample(value)
  198. a.setsM.RUnlock()
  199. return nil
  200. }
  201. a.setsM.RUnlock()
  202. a.setsM.Lock()
  203. // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
  204. if set, found := a.sets[context]; found {
  205. set.sample(value)
  206. a.setsM.Unlock()
  207. return nil
  208. }
  209. a.sets[context] = newSetMetric(name, value, tags)
  210. a.setsM.Unlock()
  211. return nil
  212. }
  213. // Only histograms, distributions and timings are sampled with a rate since we
  214. // only pack them in on message instead of aggregating them. Discarding the
  215. // sample rate will have impacts on the CPU and memory usage of the Agent.
  216. // type alias for Client.sendToAggregator
  217. type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error
  218. func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error {
  219. return a.histograms.sample(name, value, tags, rate)
  220. }
  221. func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error {
  222. return a.distributions.sample(name, value, tags, rate)
  223. }
  224. func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error {
  225. return a.timings.sample(name, value, tags, rate)
  226. }