sampler.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. package tracer
  6. import (
  7. "encoding/json"
  8. "io"
  9. "math"
  10. "sync"
  11. "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
  12. "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
  13. "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
  14. )
  15. // Sampler is the generic interface of any sampler. It must be safe for concurrent use.
  16. type Sampler interface {
  17. // Sample returns true if the given span should be sampled.
  18. Sample(span Span) bool
  19. }
  20. // RateSampler is a sampler implementation which randomly selects spans using a
  21. // provided rate. For example, a rate of 0.75 will permit 75% of the spans.
  22. // RateSampler implementations should be safe for concurrent use.
  23. type RateSampler interface {
  24. Sampler
  25. // Rate returns the current sample rate.
  26. Rate() float64
  27. // SetRate sets a new sample rate.
  28. SetRate(rate float64)
  29. }
  30. // rateSampler samples from a sample rate.
  31. type rateSampler struct {
  32. sync.RWMutex
  33. rate float64
  34. }
  35. // NewAllSampler is a short-hand for NewRateSampler(1). It is all-permissive.
  36. func NewAllSampler() RateSampler { return NewRateSampler(1) }
  37. // NewRateSampler returns an initialized RateSampler with a given sample rate.
  38. func NewRateSampler(rate float64) RateSampler {
  39. return &rateSampler{rate: rate}
  40. }
  41. // Rate returns the current rate of the sampler.
  42. func (r *rateSampler) Rate() float64 {
  43. r.RLock()
  44. defer r.RUnlock()
  45. return r.rate
  46. }
  47. // SetRate sets a new sampling rate.
  48. func (r *rateSampler) SetRate(rate float64) {
  49. r.Lock()
  50. r.rate = rate
  51. r.Unlock()
  52. }
  53. // constants used for the Knuth hashing, same as agent.
  54. const knuthFactor = uint64(1111111111111111111)
  55. // Sample returns true if the given span should be sampled.
  56. func (r *rateSampler) Sample(spn ddtrace.Span) bool {
  57. if r.rate == 1 {
  58. // fast path
  59. return true
  60. }
  61. s, ok := spn.(*span)
  62. if !ok {
  63. return false
  64. }
  65. r.RLock()
  66. defer r.RUnlock()
  67. return sampledByRate(s.TraceID, r.rate)
  68. }
  69. // sampledByRate verifies if the number n should be sampled at the specified
  70. // rate.
  71. func sampledByRate(n uint64, rate float64) bool {
  72. if rate < 1 {
  73. return n*knuthFactor < uint64(rate*math.MaxUint64)
  74. }
  75. return true
  76. }
  77. // prioritySampler holds a set of per-service sampling rates and applies
  78. // them to spans.
  79. type prioritySampler struct {
  80. mu sync.RWMutex
  81. rates map[string]float64
  82. defaultRate float64
  83. }
  84. func newPrioritySampler() *prioritySampler {
  85. return &prioritySampler{
  86. rates: make(map[string]float64),
  87. defaultRate: 1.,
  88. }
  89. }
  90. // readRatesJSON will try to read the rates as JSON from the given io.ReadCloser.
  91. func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
  92. var payload struct {
  93. Rates map[string]float64 `json:"rate_by_service"`
  94. }
  95. if err := json.NewDecoder(rc).Decode(&payload); err != nil {
  96. return err
  97. }
  98. rc.Close()
  99. const defaultRateKey = "service:,env:"
  100. ps.mu.Lock()
  101. defer ps.mu.Unlock()
  102. ps.rates = payload.Rates
  103. if v, ok := ps.rates[defaultRateKey]; ok {
  104. ps.defaultRate = v
  105. delete(ps.rates, defaultRateKey)
  106. }
  107. return nil
  108. }
  109. // getRate returns the sampling rate to be used for the given span. Callers must
  110. // guard the span.
  111. func (ps *prioritySampler) getRate(spn *span) float64 {
  112. key := "service:" + spn.Service + ",env:" + spn.Meta[ext.Environment]
  113. ps.mu.RLock()
  114. defer ps.mu.RUnlock()
  115. if rate, ok := ps.rates[key]; ok {
  116. return rate
  117. }
  118. return ps.defaultRate
  119. }
  120. // apply applies sampling priority to the given span. Caller must ensure it is safe
  121. // to modify the span.
  122. func (ps *prioritySampler) apply(spn *span) {
  123. rate := ps.getRate(spn)
  124. if sampledByRate(spn.TraceID, rate) {
  125. spn.setSamplingPriority(ext.PriorityAutoKeep, samplernames.AgentRate)
  126. } else {
  127. spn.setSamplingPriority(ext.PriorityAutoReject, samplernames.AgentRate)
  128. }
  129. spn.SetTag(keySamplingPriorityRate, rate)
  130. }