stats.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. //go:generate msgp -unexported -marshal=false -o=stats_msgp.go -tests=false
  6. package tracer
  7. import (
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
  12. "github.com/DataDog/datadog-go/v5/statsd"
  13. "github.com/DataDog/sketches-go/ddsketch"
  14. "google.golang.org/protobuf/proto"
  15. )
  16. // aggregableSpan holds necessary information about a span that can be used to
  17. // aggregate statistics in a bucket.
  18. type aggregableSpan struct {
  19. // key specifies the aggregation key under which this span can be placed into
  20. // grouped inside a bucket.
  21. key aggregation
  22. Start, Duration int64
  23. Error int32
  24. TopLevel bool
  25. }
  26. // defaultStatsBucketSize specifies the default span of time that will be
  27. // covered in one stats bucket.
  28. var defaultStatsBucketSize = (10 * time.Second).Nanoseconds()
  29. // concentrator aggregates and stores statistics on incoming spans in time buckets,
  30. // flushing them occasionally to the underlying transport located in the given
  31. // tracer config.
  32. type concentrator struct {
  33. // In specifies the channel to be used for feeding data to the concentrator.
  34. // In order for In to have a consumer, the concentrator must be started using
  35. // a call to Start.
  36. In chan *aggregableSpan
  37. // mu guards below fields
  38. mu sync.Mutex
  39. // buckets maintains a set of buckets, where the map key represents
  40. // the starting point in time of that bucket, in nanoseconds.
  41. buckets map[int64]*rawBucket
  42. // stopped reports whether the concentrator is stopped (when non-zero)
  43. stopped uint32
  44. wg sync.WaitGroup // waits for any active goroutines
  45. bucketSize int64 // the size of a bucket in nanoseconds
  46. stop chan struct{} // closing this channel triggers shutdown
  47. cfg *config // tracer startup configuration
  48. statsdClient statsdClient // statsd client for sending metrics.
  49. }
  50. // newConcentrator creates a new concentrator using the given tracer
  51. // configuration c. It creates buckets of bucketSize nanoseconds duration.
  52. func newConcentrator(c *config, bucketSize int64) *concentrator {
  53. return &concentrator{
  54. In: make(chan *aggregableSpan, 10000),
  55. bucketSize: bucketSize,
  56. stopped: 1,
  57. buckets: make(map[int64]*rawBucket),
  58. cfg: c,
  59. }
  60. }
  61. // alignTs returns the provided timestamp truncated to the bucket size.
  62. // It gives us the start time of the time bucket in which such timestamp falls.
  63. func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize }
  64. // Start starts the concentrator. A started concentrator needs to be stopped
  65. // in order to gracefully shut down, using Stop.
  66. func (c *concentrator) Start() {
  67. if atomic.SwapUint32(&c.stopped, 0) == 0 {
  68. // already running
  69. log.Warn("(*concentrator).Start called more than once. This is likely a programming error.")
  70. return
  71. }
  72. c.stop = make(chan struct{})
  73. c.wg.Add(1)
  74. go func() {
  75. defer c.wg.Done()
  76. tick := time.NewTicker(time.Duration(c.bucketSize) * time.Nanosecond)
  77. defer tick.Stop()
  78. c.runFlusher(tick.C)
  79. }()
  80. c.wg.Add(1)
  81. go func() {
  82. defer c.wg.Done()
  83. c.runIngester()
  84. }()
  85. }
  86. // runFlusher runs the flushing loop which sends stats to the underlying transport.
  87. func (c *concentrator) runFlusher(tick <-chan time.Time) {
  88. for {
  89. select {
  90. case now := <-tick:
  91. c.flushAndSend(now, withoutCurrentBucket)
  92. case <-c.stop:
  93. return
  94. }
  95. }
  96. }
  97. // statsd returns any tracer configured statsd client, or a no-op.
  98. func (c *concentrator) statsd() statsdClient {
  99. if c.statsdClient == nil {
  100. return &statsd.NoOpClient{}
  101. }
  102. return c.statsdClient
  103. }
  104. // runIngester runs the loop which accepts incoming data on the concentrator's In
  105. // channel.
  106. func (c *concentrator) runIngester() {
  107. for {
  108. select {
  109. case s := <-c.In:
  110. c.statsd().Incr("datadog.tracer.stats.spans_in", nil, 1)
  111. c.add(s)
  112. case <-c.stop:
  113. return
  114. }
  115. }
  116. }
  117. // add adds s into the concentrator's internal stats buckets.
  118. func (c *concentrator) add(s *aggregableSpan) {
  119. c.mu.Lock()
  120. defer c.mu.Unlock()
  121. btime := alignTs(s.Start+s.Duration, c.bucketSize)
  122. b, ok := c.buckets[btime]
  123. if !ok {
  124. b = newRawBucket(uint64(btime), c.bucketSize)
  125. c.buckets[btime] = b
  126. }
  127. b.handleSpan(s)
  128. }
  129. // Stop stops the concentrator and blocks until the operation completes.
  130. func (c *concentrator) Stop() {
  131. if atomic.SwapUint32(&c.stopped, 1) > 0 {
  132. return
  133. }
  134. close(c.stop)
  135. c.wg.Wait()
  136. drain:
  137. for {
  138. select {
  139. case s := <-c.In:
  140. c.statsd().Incr("datadog.tracer.stats.spans_in", nil, 1)
  141. c.add(s)
  142. default:
  143. break drain
  144. }
  145. }
  146. c.flushAndSend(time.Now(), withCurrentBucket)
  147. }
  148. const (
  149. withCurrentBucket = true
  150. withoutCurrentBucket = false
  151. )
  152. // flushAndSend flushes all the stats buckets with the given timestamp and sends them using the transport specified in
  153. // the concentrator config. The current bucket is only included if includeCurrent is true, such as during shutdown.
  154. func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) {
  155. sp := func() statsPayload {
  156. c.mu.Lock()
  157. defer c.mu.Unlock()
  158. now := timenow.UnixNano()
  159. sp := statsPayload{
  160. Hostname: c.cfg.hostname,
  161. Env: c.cfg.env,
  162. Version: c.cfg.version,
  163. Stats: make([]statsBucket, 0, len(c.buckets)),
  164. }
  165. for ts, srb := range c.buckets {
  166. if !includeCurrent && ts > now-c.bucketSize {
  167. // do not flush the current bucket
  168. continue
  169. }
  170. log.Debug("Flushing bucket %d", ts)
  171. sp.Stats = append(sp.Stats, srb.Export())
  172. delete(c.buckets, ts)
  173. }
  174. return sp
  175. }()
  176. if len(sp.Stats) == 0 {
  177. // nothing to flush
  178. return
  179. }
  180. c.statsd().Incr("datadog.tracer.stats.flush_payloads", nil, 1)
  181. c.statsd().Incr("datadog.tracer.stats.flush_buckets", nil, float64(len(sp.Stats)))
  182. if err := c.cfg.transport.sendStats(&sp); err != nil {
  183. c.statsd().Incr("datadog.tracer.stats.flush_errors", nil, 1)
  184. log.Error("Error sending stats payload: %v", err)
  185. }
  186. }
  187. // aggregation specifies a uniquely identifiable key under which a certain set
  188. // of stats are grouped inside a bucket.
  189. type aggregation struct {
  190. Name string
  191. Type string
  192. Resource string
  193. Service string
  194. StatusCode uint32
  195. Synthetics bool
  196. }
  197. type rawBucket struct {
  198. start, duration uint64
  199. data map[aggregation]*rawGroupedStats
  200. }
  201. func newRawBucket(btime uint64, bsize int64) *rawBucket {
  202. return &rawBucket{
  203. start: btime,
  204. duration: uint64(bsize),
  205. data: make(map[aggregation]*rawGroupedStats),
  206. }
  207. }
  208. func (sb *rawBucket) handleSpan(s *aggregableSpan) {
  209. gs, ok := sb.data[s.key]
  210. if !ok {
  211. gs = newRawGroupedStats()
  212. sb.data[s.key] = gs
  213. }
  214. if s.TopLevel {
  215. gs.topLevelHits++
  216. }
  217. gs.hits++
  218. if s.Error != 0 {
  219. gs.errors++
  220. }
  221. gs.duration += uint64(s.Duration)
  222. // alter resolution of duration distro
  223. trundur := nsTimestampToFloat(s.Duration)
  224. if s.Error != 0 {
  225. gs.errDistribution.Add(trundur)
  226. } else {
  227. gs.okDistribution.Add(trundur)
  228. }
  229. }
  230. // Export transforms a RawBucket into a statsBucket, typically used
  231. // before communicating data to the API, as RawBucket is the internal
  232. // type while statsBucket is the public, shared one.
  233. func (sb *rawBucket) Export() statsBucket {
  234. csb := statsBucket{
  235. Start: sb.start,
  236. Duration: sb.duration,
  237. Stats: make([]groupedStats, len(sb.data)),
  238. }
  239. for k, v := range sb.data {
  240. b, err := v.export(k)
  241. if err != nil {
  242. log.Error("Could not export stats bucket: %v.", err)
  243. continue
  244. }
  245. csb.Stats = append(csb.Stats, b)
  246. }
  247. return csb
  248. }
  249. type rawGroupedStats struct {
  250. hits uint64
  251. topLevelHits uint64
  252. errors uint64
  253. duration uint64
  254. okDistribution *ddsketch.DDSketch
  255. errDistribution *ddsketch.DDSketch
  256. }
  257. func newRawGroupedStats() *rawGroupedStats {
  258. const (
  259. // relativeAccuracy is the value accuracy we have on the percentiles. For example, we can
  260. // say that p99 is 100ms +- 1ms
  261. relativeAccuracy = 0.01
  262. // maxNumBins is the maximum number of bins of the ddSketch we use to store percentiles.
  263. // It can affect relative accuracy, but in practice, 2048 bins is enough to have 1% relative accuracy from
  264. // 80 micro second to 1 year: http://www.vldb.org/pvldb/vol12/p2195-masson.pdf
  265. maxNumBins = 2048
  266. )
  267. okSketch, err := ddsketch.LogCollapsingLowestDenseDDSketch(relativeAccuracy, maxNumBins)
  268. if err != nil {
  269. log.Error("Error when creating ddsketch: %v", err)
  270. }
  271. errSketch, err := ddsketch.LogCollapsingLowestDenseDDSketch(relativeAccuracy, maxNumBins)
  272. if err != nil {
  273. log.Error("Error when creating ddsketch: %v", err)
  274. }
  275. return &rawGroupedStats{
  276. okDistribution: okSketch,
  277. errDistribution: errSketch,
  278. }
  279. }
  280. func (s *rawGroupedStats) export(k aggregation) (groupedStats, error) {
  281. msg := s.okDistribution.ToProto()
  282. okSummary, err := proto.Marshal(msg)
  283. if err != nil {
  284. return groupedStats{}, err
  285. }
  286. msg = s.errDistribution.ToProto()
  287. errSummary, err := proto.Marshal(msg)
  288. if err != nil {
  289. return groupedStats{}, err
  290. }
  291. return groupedStats{
  292. Service: k.Service,
  293. Name: k.Name,
  294. Resource: k.Resource,
  295. HTTPStatusCode: k.StatusCode,
  296. Type: k.Type,
  297. Hits: s.hits,
  298. Errors: s.errors,
  299. Duration: s.duration,
  300. TopLevelHits: s.topLevelHits,
  301. OkSummary: okSummary,
  302. ErrorSummary: errSummary,
  303. Synthetics: k.Synthetics,
  304. }, nil
  305. }
  306. // nsTimestampToFloat converts a nanosec timestamp into a float nanosecond timestamp truncated to a fixed precision
  307. func nsTimestampToFloat(ns int64) float64 {
  308. // 10 bits precision (any value will be +/- 1/1024)
  309. const roundMask int64 = 1 << 10
  310. var shift uint
  311. for ns > roundMask {
  312. ns = ns >> 1
  313. shift++
  314. }
  315. return float64(ns << shift)
  316. }