| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2016 Datadog, Inc.
- //go:generate msgp -unexported -marshal=false -o=stats_msgp.go -tests=false
- package tracer
- import (
- "sync"
- "sync/atomic"
- "time"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
- "github.com/DataDog/datadog-go/v5/statsd"
- "github.com/DataDog/sketches-go/ddsketch"
- "google.golang.org/protobuf/proto"
- )
- // aggregableSpan holds necessary information about a span that can be used to
- // aggregate statistics in a bucket.
- type aggregableSpan struct {
- // key specifies the aggregation key under which this span can be placed into
- // grouped inside a bucket.
- key aggregation
- Start, Duration int64
- Error int32
- TopLevel bool
- }
- // defaultStatsBucketSize specifies the default span of time that will be
- // covered in one stats bucket.
- var defaultStatsBucketSize = (10 * time.Second).Nanoseconds()
- // concentrator aggregates and stores statistics on incoming spans in time buckets,
- // flushing them occasionally to the underlying transport located in the given
- // tracer config.
- type concentrator struct {
- // In specifies the channel to be used for feeding data to the concentrator.
- // In order for In to have a consumer, the concentrator must be started using
- // a call to Start.
- In chan *aggregableSpan
- // mu guards below fields
- mu sync.Mutex
- // buckets maintains a set of buckets, where the map key represents
- // the starting point in time of that bucket, in nanoseconds.
- buckets map[int64]*rawBucket
- // stopped reports whether the concentrator is stopped (when non-zero)
- stopped uint32
- wg sync.WaitGroup // waits for any active goroutines
- bucketSize int64 // the size of a bucket in nanoseconds
- stop chan struct{} // closing this channel triggers shutdown
- cfg *config // tracer startup configuration
- statsdClient statsdClient // statsd client for sending metrics.
- }
- // newConcentrator creates a new concentrator using the given tracer
- // configuration c. It creates buckets of bucketSize nanoseconds duration.
- func newConcentrator(c *config, bucketSize int64) *concentrator {
- return &concentrator{
- In: make(chan *aggregableSpan, 10000),
- bucketSize: bucketSize,
- stopped: 1,
- buckets: make(map[int64]*rawBucket),
- cfg: c,
- }
- }
- // alignTs returns the provided timestamp truncated to the bucket size.
- // It gives us the start time of the time bucket in which such timestamp falls.
- func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize }
- // Start starts the concentrator. A started concentrator needs to be stopped
- // in order to gracefully shut down, using Stop.
- func (c *concentrator) Start() {
- if atomic.SwapUint32(&c.stopped, 0) == 0 {
- // already running
- log.Warn("(*concentrator).Start called more than once. This is likely a programming error.")
- return
- }
- c.stop = make(chan struct{})
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- tick := time.NewTicker(time.Duration(c.bucketSize) * time.Nanosecond)
- defer tick.Stop()
- c.runFlusher(tick.C)
- }()
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- c.runIngester()
- }()
- }
- // runFlusher runs the flushing loop which sends stats to the underlying transport.
- func (c *concentrator) runFlusher(tick <-chan time.Time) {
- for {
- select {
- case now := <-tick:
- c.flushAndSend(now, withoutCurrentBucket)
- case <-c.stop:
- return
- }
- }
- }
- // statsd returns any tracer configured statsd client, or a no-op.
- func (c *concentrator) statsd() statsdClient {
- if c.statsdClient == nil {
- return &statsd.NoOpClient{}
- }
- return c.statsdClient
- }
- // runIngester runs the loop which accepts incoming data on the concentrator's In
- // channel.
- func (c *concentrator) runIngester() {
- for {
- select {
- case s := <-c.In:
- c.statsd().Incr("datadog.tracer.stats.spans_in", nil, 1)
- c.add(s)
- case <-c.stop:
- return
- }
- }
- }
- // add adds s into the concentrator's internal stats buckets.
- func (c *concentrator) add(s *aggregableSpan) {
- c.mu.Lock()
- defer c.mu.Unlock()
- btime := alignTs(s.Start+s.Duration, c.bucketSize)
- b, ok := c.buckets[btime]
- if !ok {
- b = newRawBucket(uint64(btime), c.bucketSize)
- c.buckets[btime] = b
- }
- b.handleSpan(s)
- }
- // Stop stops the concentrator and blocks until the operation completes.
- func (c *concentrator) Stop() {
- if atomic.SwapUint32(&c.stopped, 1) > 0 {
- return
- }
- close(c.stop)
- c.wg.Wait()
- drain:
- for {
- select {
- case s := <-c.In:
- c.statsd().Incr("datadog.tracer.stats.spans_in", nil, 1)
- c.add(s)
- default:
- break drain
- }
- }
- c.flushAndSend(time.Now(), withCurrentBucket)
- }
- const (
- withCurrentBucket = true
- withoutCurrentBucket = false
- )
- // flushAndSend flushes all the stats buckets with the given timestamp and sends them using the transport specified in
- // the concentrator config. The current bucket is only included if includeCurrent is true, such as during shutdown.
- func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) {
- sp := func() statsPayload {
- c.mu.Lock()
- defer c.mu.Unlock()
- now := timenow.UnixNano()
- sp := statsPayload{
- Hostname: c.cfg.hostname,
- Env: c.cfg.env,
- Version: c.cfg.version,
- Stats: make([]statsBucket, 0, len(c.buckets)),
- }
- for ts, srb := range c.buckets {
- if !includeCurrent && ts > now-c.bucketSize {
- // do not flush the current bucket
- continue
- }
- log.Debug("Flushing bucket %d", ts)
- sp.Stats = append(sp.Stats, srb.Export())
- delete(c.buckets, ts)
- }
- return sp
- }()
- if len(sp.Stats) == 0 {
- // nothing to flush
- return
- }
- c.statsd().Incr("datadog.tracer.stats.flush_payloads", nil, 1)
- c.statsd().Incr("datadog.tracer.stats.flush_buckets", nil, float64(len(sp.Stats)))
- if err := c.cfg.transport.sendStats(&sp); err != nil {
- c.statsd().Incr("datadog.tracer.stats.flush_errors", nil, 1)
- log.Error("Error sending stats payload: %v", err)
- }
- }
- // aggregation specifies a uniquely identifiable key under which a certain set
- // of stats are grouped inside a bucket.
- type aggregation struct {
- Name string
- Type string
- Resource string
- Service string
- StatusCode uint32
- Synthetics bool
- }
- type rawBucket struct {
- start, duration uint64
- data map[aggregation]*rawGroupedStats
- }
- func newRawBucket(btime uint64, bsize int64) *rawBucket {
- return &rawBucket{
- start: btime,
- duration: uint64(bsize),
- data: make(map[aggregation]*rawGroupedStats),
- }
- }
- func (sb *rawBucket) handleSpan(s *aggregableSpan) {
- gs, ok := sb.data[s.key]
- if !ok {
- gs = newRawGroupedStats()
- sb.data[s.key] = gs
- }
- if s.TopLevel {
- gs.topLevelHits++
- }
- gs.hits++
- if s.Error != 0 {
- gs.errors++
- }
- gs.duration += uint64(s.Duration)
- // alter resolution of duration distro
- trundur := nsTimestampToFloat(s.Duration)
- if s.Error != 0 {
- gs.errDistribution.Add(trundur)
- } else {
- gs.okDistribution.Add(trundur)
- }
- }
- // Export transforms a RawBucket into a statsBucket, typically used
- // before communicating data to the API, as RawBucket is the internal
- // type while statsBucket is the public, shared one.
- func (sb *rawBucket) Export() statsBucket {
- csb := statsBucket{
- Start: sb.start,
- Duration: sb.duration,
- Stats: make([]groupedStats, len(sb.data)),
- }
- for k, v := range sb.data {
- b, err := v.export(k)
- if err != nil {
- log.Error("Could not export stats bucket: %v.", err)
- continue
- }
- csb.Stats = append(csb.Stats, b)
- }
- return csb
- }
- type rawGroupedStats struct {
- hits uint64
- topLevelHits uint64
- errors uint64
- duration uint64
- okDistribution *ddsketch.DDSketch
- errDistribution *ddsketch.DDSketch
- }
- func newRawGroupedStats() *rawGroupedStats {
- const (
- // relativeAccuracy is the value accuracy we have on the percentiles. For example, we can
- // say that p99 is 100ms +- 1ms
- relativeAccuracy = 0.01
- // maxNumBins is the maximum number of bins of the ddSketch we use to store percentiles.
- // It can affect relative accuracy, but in practice, 2048 bins is enough to have 1% relative accuracy from
- // 80 micro second to 1 year: http://www.vldb.org/pvldb/vol12/p2195-masson.pdf
- maxNumBins = 2048
- )
- okSketch, err := ddsketch.LogCollapsingLowestDenseDDSketch(relativeAccuracy, maxNumBins)
- if err != nil {
- log.Error("Error when creating ddsketch: %v", err)
- }
- errSketch, err := ddsketch.LogCollapsingLowestDenseDDSketch(relativeAccuracy, maxNumBins)
- if err != nil {
- log.Error("Error when creating ddsketch: %v", err)
- }
- return &rawGroupedStats{
- okDistribution: okSketch,
- errDistribution: errSketch,
- }
- }
- func (s *rawGroupedStats) export(k aggregation) (groupedStats, error) {
- msg := s.okDistribution.ToProto()
- okSummary, err := proto.Marshal(msg)
- if err != nil {
- return groupedStats{}, err
- }
- msg = s.errDistribution.ToProto()
- errSummary, err := proto.Marshal(msg)
- if err != nil {
- return groupedStats{}, err
- }
- return groupedStats{
- Service: k.Service,
- Name: k.Name,
- Resource: k.Resource,
- HTTPStatusCode: k.StatusCode,
- Type: k.Type,
- Hits: s.hits,
- Errors: s.errors,
- Duration: s.duration,
- TopLevelHits: s.topLevelHits,
- OkSummary: okSummary,
- ErrorSummary: errSummary,
- Synthetics: k.Synthetics,
- }, nil
- }
- // nsTimestampToFloat converts a nanosec timestamp into a float nanosecond timestamp truncated to a fixed precision
- func nsTimestampToFloat(ns int64) float64 {
- // 10 bits precision (any value will be +/- 1/1024)
- const roundMask int64 = 1 << 10
- var shift uint
- for ns > roundMask {
- ns = ns >> 1
- shift++
- }
- return float64(ns << shift)
- }
|