buffered_metric_context.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package statsd
  2. import (
  3. "math/rand"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // bufferedMetricContexts represent the contexts for Histograms, Distributions
  9. // and Timing. Since those 3 metric types behave the same way and are sampled
  10. // with the same type they're represented by the same class.
  11. type bufferedMetricContexts struct {
  12. nbContext int32
  13. mutex sync.RWMutex
  14. values bufferedMetricMap
  15. newMetric func(string, float64, string) *bufferedMetric
  16. // Each bufferedMetricContexts uses its own random source and random
  17. // lock to prevent goroutines from contending for the lock on the
  18. // "math/rand" package-global random source (e.g. calls like
  19. // "rand.Float64()" must acquire a shared lock to get the next
  20. // pseudorandom number).
  21. random *rand.Rand
  22. randomLock sync.Mutex
  23. }
  24. func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
  25. return bufferedMetricContexts{
  26. values: bufferedMetricMap{},
  27. newMetric: newMetric,
  28. // Note that calling "time.Now().UnixNano()" repeatedly quickly may return
  29. // very similar values. That's fine for seeding the worker-specific random
  30. // source because we just need an evenly distributed stream of float values.
  31. // Do not use this random source for cryptographic randomness.
  32. random: rand.New(rand.NewSource(time.Now().UnixNano())),
  33. }
  34. }
  35. func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
  36. bc.mutex.Lock()
  37. values := bc.values
  38. bc.values = bufferedMetricMap{}
  39. bc.mutex.Unlock()
  40. for _, d := range values {
  41. metrics = append(metrics, d.flushUnsafe())
  42. }
  43. atomic.AddInt32(&bc.nbContext, int32(len(values)))
  44. return metrics
  45. }
  46. func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
  47. if !shouldSample(rate, bc.random, &bc.randomLock) {
  48. return nil
  49. }
  50. context, stringTags := getContextAndTags(name, tags)
  51. bc.mutex.RLock()
  52. if v, found := bc.values[context]; found {
  53. v.sample(value)
  54. bc.mutex.RUnlock()
  55. return nil
  56. }
  57. bc.mutex.RUnlock()
  58. bc.mutex.Lock()
  59. // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
  60. if v, found := bc.values[context]; found {
  61. v.sample(value)
  62. bc.mutex.Unlock()
  63. return nil
  64. }
  65. bc.values[context] = bc.newMetric(name, value, stringTags)
  66. bc.mutex.Unlock()
  67. return nil
  68. }
  69. func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
  70. return atomic.SwapInt32(&bc.nbContext, 0)
  71. }