| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2016 Datadog, Inc.
- package tracer
- import (
- "strconv"
- "sync"
- "sync/atomic"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
- )
- var _ ddtrace.SpanContext = (*spanContext)(nil)
- // SpanContext represents a span state that can propagate to descendant spans
- // and across process boundaries. It contains all the information needed to
- // spawn a direct descendant of the span that it belongs to. It can be used
- // to create distributed tracing by propagating it using the provided interfaces.
- type spanContext struct {
- updated bool // updated is tracking changes for priority / origin / x-datadog-tags
- // the below group should propagate only locally
- trace *trace // reference to the trace that this span belongs too
- span *span // reference to the span that hosts this context
- errors int32 // number of spans with errors in this trace
- // the below group should propagate cross-process
- traceID uint64
- spanID uint64
- mu sync.RWMutex // guards below fields
- baggage map[string]string
- hasBaggage uint32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
- origin string // e.g. "synthetics"
- }
- // newSpanContext creates a new SpanContext to serve as context for the given
- // span. If the provided parent is not nil, the context will inherit the trace,
- // baggage and other values from it. This method also pushes the span into the
- // new context's trace and as a result, it should not be called multiple times
- // for the same span.
- func newSpanContext(span *span, parent *spanContext) *spanContext {
- context := &spanContext{
- traceID: span.TraceID,
- spanID: span.SpanID,
- span: span,
- }
- if parent != nil {
- context.trace = parent.trace
- context.origin = parent.origin
- context.errors = parent.errors
- parent.ForeachBaggageItem(func(k, v string) bool {
- context.setBaggageItem(k, v)
- return true
- })
- }
- if context.trace == nil {
- context.trace = newTrace()
- }
- if context.trace.root == nil {
- // first span in the trace can safely be assumed to be the root
- context.trace.root = span
- }
- // put span in context's trace
- context.trace.push(span)
- // setting context.updated to false here is necessary to distinguish
- // between initializing properties of the span (priority)
- // and updating them after extracting context through propagators
- context.updated = false
- return context
- }
- // SpanID implements ddtrace.SpanContext.
- func (c *spanContext) SpanID() uint64 { return c.spanID }
- // TraceID implements ddtrace.SpanContext.
- func (c *spanContext) TraceID() uint64 { return c.traceID }
- // ForeachBaggageItem implements ddtrace.SpanContext.
- func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
- if atomic.LoadUint32(&c.hasBaggage) == 0 {
- return
- }
- c.mu.RLock()
- defer c.mu.RUnlock()
- for k, v := range c.baggage {
- if !handler(k, v) {
- break
- }
- }
- }
- func (c *spanContext) setSamplingPriority(p int, sampler samplernames.SamplerName) {
- if c.trace == nil {
- c.trace = newTrace()
- }
- if c.trace.priority != nil && *c.trace.priority != float64(p) {
- c.updated = true
- }
- c.trace.setSamplingPriority(p, sampler)
- }
- func (c *spanContext) samplingPriority() (p int, ok bool) {
- if c.trace == nil {
- return 0, false
- }
- return c.trace.samplingPriority()
- }
- func (c *spanContext) setBaggageItem(key, val string) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.baggage == nil {
- atomic.StoreUint32(&c.hasBaggage, 1)
- c.baggage = make(map[string]string, 1)
- }
- c.baggage[key] = val
- }
- func (c *spanContext) baggageItem(key string) string {
- if atomic.LoadUint32(&c.hasBaggage) == 0 {
- return ""
- }
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.baggage[key]
- }
- func (c *spanContext) meta(key string) (val string, ok bool) {
- c.span.RLock()
- defer c.span.RUnlock()
- val, ok = c.span.Meta[key]
- return val, ok
- }
- // finish marks this span as finished in the trace.
- func (c *spanContext) finish() { c.trace.finishedOne(c.span) }
- // samplingDecision is the decision to send a trace to the agent or not.
- type samplingDecision uint32
- const (
- // decisionNone is the default state of a trace.
- // If no decision is made about the trace, the trace won't be sent to the agent.
- decisionNone samplingDecision = iota
- // decisionDrop prevents the trace from being sent to the agent.
- decisionDrop
- // decisionKeep ensures the trace will be sent to the agent.
- decisionKeep
- )
- // trace contains shared context information about a trace, such as sampling
- // priority, the root reference and a buffer of the spans which are part of the
- // trace, if these exist.
- type trace struct {
- mu sync.RWMutex // guards below fields
- spans []*span // all the spans that are part of this trace
- tags map[string]string // trace level tags
- propagatingTags map[string]string // trace level tags that will be propagated across service boundaries
- finished int // the number of finished spans
- full bool // signifies that the span buffer is full
- priority *float64 // sampling priority
- locked bool // specifies if the sampling priority can be altered
- samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.
- // root specifies the root of the trace, if known; it is nil when a span
- // context is extracted from a carrier, at which point there are no spans in
- // the trace yet.
- root *span
- }
- var (
- // traceStartSize is the initial size of our trace buffer,
- // by default we allocate for a handful of spans within the trace,
- // reasonable as span is actually way bigger, and avoids re-allocating
- // over and over. Could be fine-tuned at runtime.
- traceStartSize = 10
- // traceMaxSize is the maximum number of spans we keep in memory for a
- // single trace. This is to avoid memory leaks. If more spans than this
- // are added to a trace, then the trace is dropped and the spans are
- // discarded. Adding additional spans after a trace is dropped does
- // nothing.
- traceMaxSize = int(1e5)
- )
- // newTrace creates a new trace using the given callback which will be called
- // upon completion of the trace.
- func newTrace() *trace {
- return &trace{spans: make([]*span, 0, traceStartSize)}
- }
- func (t *trace) samplingPriorityLocked() (p int, ok bool) {
- if t.priority == nil {
- return 0, false
- }
- return int(*t.priority), true
- }
- func (t *trace) samplingPriority() (p int, ok bool) {
- t.mu.RLock()
- defer t.mu.RUnlock()
- return t.samplingPriorityLocked()
- }
- func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.setSamplingPriorityLocked(p, sampler)
- }
- func (t *trace) keep() {
- atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep))
- }
- func (t *trace) drop() {
- atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionDrop))
- }
- func (t *trace) setTag(key, value string) {
- if t.tags == nil {
- t.tags = make(map[string]string, 1)
- }
- t.tags[key] = value
- }
- // setPropagatingTag sets the key/value pair as a trace propagating tag.
- func (t *trace) setPropagatingTag(key, value string) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.setPropagatingTagLocked(key, value)
- }
- // setPropagatingTagLocked sets the key/value pair as a trace propagating tag.
- // Not safe for concurrent use, setPropagatingTag should be used instead in that case.
- func (t *trace) setPropagatingTagLocked(key, value string) {
- if t.propagatingTags == nil {
- t.propagatingTags = make(map[string]string, 1)
- }
- t.propagatingTags[key] = value
- }
- // unsetPropagatingTag deletes the key/value pair from the trace's propagated tags.
- func (t *trace) unsetPropagatingTag(key string) {
- t.mu.Lock()
- defer t.mu.Unlock()
- delete(t.propagatingTags, key)
- }
- func (t *trace) setSamplingPriorityLocked(p int, sampler samplernames.SamplerName) {
- if t.locked {
- return
- }
- if t.priority == nil {
- t.priority = new(float64)
- }
- *t.priority = float64(p)
- _, ok := t.propagatingTags[keyDecisionMaker]
- if p > 0 && !ok && sampler != samplernames.Unknown {
- // We have a positive priority and the sampling mechanism isn't set.
- // Send nothing when sampler is `Unknown` for RFC compliance.
- t.setPropagatingTagLocked(keyDecisionMaker, "-"+strconv.Itoa(int(sampler)))
- }
- if p <= 0 && ok {
- delete(t.propagatingTags, keyDecisionMaker)
- }
- }
- // push pushes a new span into the trace. If the buffer is full, it returns
- // a errBufferFull error.
- func (t *trace) push(sp *span) {
- t.mu.Lock()
- defer t.mu.Unlock()
- if t.full {
- return
- }
- tr, haveTracer := internal.GetGlobalTracer().(*tracer)
- if len(t.spans) >= traceMaxSize {
- // capacity is reached, we will not be able to complete this trace.
- t.full = true
- t.spans = nil // GC
- log.Error("trace buffer full (%d), dropping trace", traceMaxSize)
- if haveTracer {
- atomic.AddUint32(&tr.tracesDropped, 1)
- }
- return
- }
- if v, ok := sp.Metrics[keySamplingPriority]; ok {
- t.setSamplingPriorityLocked(int(v), samplernames.Unknown)
- }
- t.spans = append(t.spans, sp)
- if haveTracer {
- atomic.AddUint32(&tr.spansStarted, 1)
- }
- }
- // finishedOne acknowledges that another span in the trace has finished, and checks
- // if the trace is complete, in which case it calls the onFinish function. It uses
- // the given priority, if non-nil, to mark the root span.
- func (t *trace) finishedOne(s *span) {
- t.mu.Lock()
- defer t.mu.Unlock()
- if t.full {
- // capacity has been reached, the buffer is no longer tracking
- // all the spans in the trace, so the below conditions will not
- // be accurate and would trigger a pre-mature flush, exposing us
- // to a race condition where spans can be modified while flushing.
- return
- }
- t.finished++
- if s == t.root && t.priority != nil {
- // after the root has finished we lock down the priority;
- // we won't be able to make changes to a span after finishing
- // without causing a race condition.
- t.root.setMetric(keySamplingPriority, *t.priority)
- t.locked = true
- }
- if len(t.spans) > 0 && s == t.spans[0] {
- // first span in chunk finished, lock down the tags
- //
- // TODO(barbayar): make sure this doesn't happen in vain when switching to
- // the new wire format. We won't need to set the tags on the first span
- // in the chunk there.
- for k, v := range t.tags {
- s.setMeta(k, v)
- }
- for k, v := range t.propagatingTags {
- s.setMeta(k, v)
- }
- }
- if len(t.spans) != t.finished {
- return
- }
- defer func() {
- t.spans = nil
- t.finished = 0 // important, because a buffer can be used for several flushes
- }()
- tr, ok := internal.GetGlobalTracer().(*tracer)
- if !ok {
- return
- }
- // we have a tracer that can receive completed traces.
- atomic.AddUint32(&tr.spansFinished, uint32(len(t.spans)))
- tr.pushTrace(&finishedTrace{
- spans: t.spans,
- willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
- })
- }
|