aggregator.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package statsd
  2. import (
  3. "strings"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type (
  9. countsMap map[string]*countMetric
  10. gaugesMap map[string]*gaugeMetric
  11. setsMap map[string]*setMetric
  12. bufferedMetricMap map[string]*bufferedMetric
  13. )
  14. type aggregator struct {
  15. nbContextGauge int32
  16. nbContextCount int32
  17. nbContextSet int32
  18. countsM sync.RWMutex
  19. gaugesM sync.RWMutex
  20. setsM sync.RWMutex
  21. gauges gaugesMap
  22. counts countsMap
  23. sets setsMap
  24. histograms bufferedMetricContexts
  25. distributions bufferedMetricContexts
  26. timings bufferedMetricContexts
  27. closed chan struct{}
  28. client *Client
  29. // aggregator implements ChannelMode mechanism to receive histograms,
  30. // distributions and timings. Since they need sampling they need to
  31. // lock for random. When using both ChannelMode and ExtendedAggregation
  32. // we don't want goroutine to fight over the lock.
  33. inputMetrics chan metric
  34. stopChannelMode chan struct{}
  35. wg sync.WaitGroup
  36. }
  37. type aggregatorMetrics struct {
  38. nbContext int32
  39. nbContextGauge int32
  40. nbContextCount int32
  41. nbContextSet int32
  42. nbContextHistogram int32
  43. nbContextDistribution int32
  44. nbContextTiming int32
  45. }
  46. func newAggregator(c *Client) *aggregator {
  47. return &aggregator{
  48. client: c,
  49. counts: countsMap{},
  50. gauges: gaugesMap{},
  51. sets: setsMap{},
  52. histograms: newBufferedContexts(newHistogramMetric),
  53. distributions: newBufferedContexts(newDistributionMetric),
  54. timings: newBufferedContexts(newTimingMetric),
  55. closed: make(chan struct{}),
  56. stopChannelMode: make(chan struct{}),
  57. }
  58. }
  59. func (a *aggregator) start(flushInterval time.Duration) {
  60. ticker := time.NewTicker(flushInterval)
  61. go func() {
  62. for {
  63. select {
  64. case <-ticker.C:
  65. a.flush()
  66. case <-a.closed:
  67. return
  68. }
  69. }
  70. }()
  71. }
  72. func (a *aggregator) startReceivingMetric(bufferSize int, nbWorkers int) {
  73. a.inputMetrics = make(chan metric, bufferSize)
  74. for i := 0; i < nbWorkers; i++ {
  75. a.wg.Add(1)
  76. go a.pullMetric()
  77. }
  78. }
  79. func (a *aggregator) stopReceivingMetric() {
  80. close(a.stopChannelMode)
  81. a.wg.Wait()
  82. }
  83. func (a *aggregator) stop() {
  84. a.closed <- struct{}{}
  85. }
  86. func (a *aggregator) pullMetric() {
  87. for {
  88. select {
  89. case m := <-a.inputMetrics:
  90. switch m.metricType {
  91. case histogram:
  92. a.histogram(m.name, m.fvalue, m.tags, m.rate)
  93. case distribution:
  94. a.distribution(m.name, m.fvalue, m.tags, m.rate)
  95. case timing:
  96. a.timing(m.name, m.fvalue, m.tags, m.rate)
  97. }
  98. case <-a.stopChannelMode:
  99. a.wg.Done()
  100. return
  101. }
  102. }
  103. }
  104. func (a *aggregator) flush() {
  105. for _, m := range a.flushMetrics() {
  106. a.client.sendBlocking(m)
  107. }
  108. }
  109. func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
  110. if a == nil {
  111. return nil
  112. }
  113. am := &aggregatorMetrics{
  114. nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
  115. nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
  116. nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
  117. nbContextHistogram: a.histograms.resetAndGetNbContext(),
  118. nbContextDistribution: a.distributions.resetAndGetNbContext(),
  119. nbContextTiming: a.timings.resetAndGetNbContext(),
  120. }
  121. am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution + am.nbContextTiming
  122. return am
  123. }
  124. func (a *aggregator) flushMetrics() []metric {
  125. metrics := []metric{}
  126. // We reset the values to avoid sending 'zero' values for metrics not
  127. // sampled during this flush interval
  128. a.setsM.Lock()
  129. sets := a.sets
  130. a.sets = setsMap{}
  131. a.setsM.Unlock()
  132. for _, s := range sets {
  133. metrics = append(metrics, s.flushUnsafe()...)
  134. }
  135. a.gaugesM.Lock()
  136. gauges := a.gauges
  137. a.gauges = gaugesMap{}
  138. a.gaugesM.Unlock()
  139. for _, g := range gauges {
  140. metrics = append(metrics, g.flushUnsafe())
  141. }
  142. a.countsM.Lock()
  143. counts := a.counts
  144. a.counts = countsMap{}
  145. a.countsM.Unlock()
  146. for _, c := range counts {
  147. metrics = append(metrics, c.flushUnsafe())
  148. }
  149. metrics = a.histograms.flush(metrics)
  150. metrics = a.distributions.flush(metrics)
  151. metrics = a.timings.flush(metrics)
  152. atomic.AddInt32(&a.nbContextCount, int32(len(counts)))
  153. atomic.AddInt32(&a.nbContextGauge, int32(len(gauges)))
  154. atomic.AddInt32(&a.nbContextSet, int32(len(sets)))
  155. return metrics
  156. }
  157. func getContext(name string, tags []string) string {
  158. return name + ":" + strings.Join(tags, tagSeparatorSymbol)
  159. }
  160. func getContextAndTags(name string, tags []string) (string, string) {
  161. stringTags := strings.Join(tags, tagSeparatorSymbol)
  162. return name + ":" + stringTags, stringTags
  163. }
  164. func (a *aggregator) count(name string, value int64, tags []string) error {
  165. context := getContext(name, tags)
  166. a.countsM.RLock()
  167. if count, found := a.counts[context]; found {
  168. count.sample(value)
  169. a.countsM.RUnlock()
  170. return nil
  171. }
  172. a.countsM.RUnlock()
  173. a.countsM.Lock()
  174. // Check if another goroutines hasn't created the value betwen the RUnlock and 'Lock'
  175. if count, found := a.counts[context]; found {
  176. count.sample(value)
  177. a.countsM.Unlock()
  178. return nil
  179. }
  180. a.counts[context] = newCountMetric(name, value, tags)
  181. a.countsM.Unlock()
  182. return nil
  183. }
  184. func (a *aggregator) gauge(name string, value float64, tags []string) error {
  185. context := getContext(name, tags)
  186. a.gaugesM.RLock()
  187. if gauge, found := a.gauges[context]; found {
  188. gauge.sample(value)
  189. a.gaugesM.RUnlock()
  190. return nil
  191. }
  192. a.gaugesM.RUnlock()
  193. gauge := newGaugeMetric(name, value, tags)
  194. a.gaugesM.Lock()
  195. // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
  196. if gauge, found := a.gauges[context]; found {
  197. gauge.sample(value)
  198. a.gaugesM.Unlock()
  199. return nil
  200. }
  201. a.gauges[context] = gauge
  202. a.gaugesM.Unlock()
  203. return nil
  204. }
  205. func (a *aggregator) set(name string, value string, tags []string) error {
  206. context := getContext(name, tags)
  207. a.setsM.RLock()
  208. if set, found := a.sets[context]; found {
  209. set.sample(value)
  210. a.setsM.RUnlock()
  211. return nil
  212. }
  213. a.setsM.RUnlock()
  214. a.setsM.Lock()
  215. // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
  216. if set, found := a.sets[context]; found {
  217. set.sample(value)
  218. a.setsM.Unlock()
  219. return nil
  220. }
  221. a.sets[context] = newSetMetric(name, value, tags)
  222. a.setsM.Unlock()
  223. return nil
  224. }
  225. // Only histograms, distributions and timings are sampled with a rate since we
  226. // only pack them in on message instead of aggregating them. Discarding the
  227. // sample rate will have impacts on the CPU and memory usage of the Agent.
  228. // type alias for Client.sendToAggregator
  229. type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error
  230. func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error {
  231. return a.histograms.sample(name, value, tags, rate)
  232. }
  233. func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error {
  234. return a.distributions.sample(name, value, tags, rate)
  235. }
  236. func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error {
  237. return a.timings.sample(name, value, tags, rate)
  238. }