| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2016 Datadog, Inc.
- package tracer
- import (
- gocontext "context"
- "os"
- "runtime/pprof"
- rt "runtime/trace"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/traceprof"
- "github.com/DataDog/datadog-agent/pkg/obfuscate"
- )
- var _ ddtrace.Tracer = (*tracer)(nil)
- // tracer creates, buffers and submits Spans which are used to time blocks of
- // computation. They are accumulated and streamed into an internal payload,
- // which is flushed to the agent whenever its size exceeds a specific threshold
- // or when a certain interval of time has passed, whichever happens first.
- //
- // tracer operates based on a worker loop which responds to various request
- // channels. It additionally holds two buffers which accumulates error and trace
- // queues to be processed by the payload encoder.
- type tracer struct {
- config *config
- // stats specifies the concentrator used to compute statistics, when client-side
- // stats are enabled.
- stats *concentrator
- // traceWriter is responsible for sending finished traces to their
- // destination, such as the Trace Agent or Datadog Forwarder.
- traceWriter traceWriter
- // out receives finishedTrace with spans to be added to the payload.
- out chan *finishedTrace
- // flush receives a channel onto which it will confirm after a flush has been
- // triggered and completed.
- flush chan chan<- struct{}
- // stop causes the tracer to shut down when closed.
- stop chan struct{}
- // stopOnce ensures the tracer is stopped exactly once.
- stopOnce sync.Once
- // wg waits for all goroutines to exit when stopping.
- wg sync.WaitGroup
- // prioritySampling holds an instance of the priority sampler.
- prioritySampling *prioritySampler
- // pid of the process
- pid int
- // These integers track metrics about spans and traces as they are started,
- // finished, and dropped
- spansStarted, spansFinished, tracesDropped uint32
- // Records the number of dropped P0 traces and spans.
- droppedP0Traces, droppedP0Spans uint32
- // partialTrace the number of partially dropped traces.
- partialTraces uint32
- // rulesSampling holds an instance of the rules sampler used to apply either trace sampling,
- // or single span sampling rules on spans. These are user-defined
- // rules for applying a sampling rate to spans that match the designated service
- // or operation name.
- rulesSampling *rulesSampler
- // obfuscator holds the obfuscator used to obfuscate resources in aggregated stats.
- // obfuscator may be nil if disabled.
- obfuscator *obfuscate.Obfuscator
- // statsd is used for tracking metrics associated with the runtime and the tracer.
- statsd statsdClient
- }
- const (
- // flushInterval is the interval at which the payload contents will be flushed
- // to the transport.
- flushInterval = 2 * time.Second
- // payloadMaxLimit is the maximum payload size allowed and should indicate the
- // maximum size of the package that the agent can receive.
- payloadMaxLimit = 9.5 * 1024 * 1024 // 9.5 MB
- // payloadSizeLimit specifies the maximum allowed size of the payload before
- // it will trigger a flush to the transport.
- payloadSizeLimit = payloadMaxLimit / 2
- // concurrentConnectionLimit specifies the maximum number of concurrent outgoing
- // connections allowed.
- concurrentConnectionLimit = 100
- )
- // statsInterval is the interval at which health metrics will be sent with the
- // statsd client; replaced in tests.
- var statsInterval = 10 * time.Second
- // Start starts the tracer with the given set of options. It will stop and replace
- // any running tracer, meaning that calling it several times will result in a restart
- // of the tracer by replacing the current instance with a new one.
- func Start(opts ...StartOption) {
- if internal.Testing {
- return // mock tracer active
- }
- t := newTracer(opts...)
- if !t.config.enabled {
- return
- }
- internal.SetGlobalTracer(t)
- if t.config.logStartup {
- logStartup(t)
- }
- // Start AppSec with remote configuration
- cfg := remoteconfig.DefaultClientConfig()
- cfg.AgentURL = t.config.agentURL
- cfg.AppVersion = t.config.version
- cfg.Env = t.config.env
- cfg.HTTP = t.config.httpClient
- cfg.ServiceName = t.config.serviceName
- appsec.Start(appsec.WithRCConfig(cfg))
- }
- // Stop stops the started tracer. Subsequent calls are valid but become no-op.
- func Stop() {
- internal.SetGlobalTracer(&internal.NoopTracer{})
- log.Flush()
- }
- // Span is an alias for ddtrace.Span. It is here to allow godoc to group methods returning
- // ddtrace.Span. It is recommended and is considered more correct to refer to this type as
- // ddtrace.Span instead.
- type Span = ddtrace.Span
- // StartSpan starts a new span with the given operation name and set of options.
- // If the tracer is not started, calling this function is a no-op.
- func StartSpan(operationName string, opts ...StartSpanOption) Span {
- return internal.GetGlobalTracer().StartSpan(operationName, opts...)
- }
- // Extract extracts a SpanContext from the carrier. The carrier is expected
- // to implement TextMapReader, otherwise an error is returned.
- // If the tracer is not started, calling this function is a no-op.
- func Extract(carrier interface{}) (ddtrace.SpanContext, error) {
- return internal.GetGlobalTracer().Extract(carrier)
- }
- // Inject injects the given SpanContext into the carrier. The carrier is
- // expected to implement TextMapWriter, otherwise an error is returned.
- // If the tracer is not started, calling this function is a no-op.
- func Inject(ctx ddtrace.SpanContext, carrier interface{}) error {
- return internal.GetGlobalTracer().Inject(ctx, carrier)
- }
- // SetUser associates user information to the current trace which the
- // provided span belongs to. The options can be used to tune which user
- // bit of information gets monitored. In case of distributed traces,
- // the user id can be propagated across traces using the WithPropagation() option.
- // See https://docs.datadoghq.com/security_platform/application_security/setup_and_configure/?tab=set_user#add-user-information-to-traces
- func SetUser(s Span, id string, opts ...UserMonitoringOption) {
- if s == nil {
- return
- }
- sp, ok := s.(interface {
- SetUser(string, ...UserMonitoringOption)
- })
- if !ok {
- return
- }
- sp.SetUser(id, opts...)
- }
- // payloadQueueSize is the buffer size of the trace channel.
- const payloadQueueSize = 1000
- func newUnstartedTracer(opts ...StartOption) *tracer {
- c := newConfig(opts...)
- sampler := newPrioritySampler()
- statsd, err := newStatsdClient(c)
- if err != nil {
- log.Warn("Runtime and health metrics disabled: %v", err)
- }
- var writer traceWriter
- if c.logToStdout {
- writer = newLogTraceWriter(c, statsd)
- } else {
- writer = newAgentTraceWriter(c, sampler, statsd)
- }
- traces, spans, err := samplingRulesFromEnv()
- if err != nil {
- log.Warn("DIAGNOSTICS Error(s) parsing sampling rules: found errors:%s", err)
- }
- if traces != nil {
- c.traceRules = traces
- }
- if spans != nil {
- c.spanRules = spans
- }
- t := &tracer{
- config: c,
- traceWriter: writer,
- out: make(chan *finishedTrace, payloadQueueSize),
- stop: make(chan struct{}),
- flush: make(chan chan<- struct{}),
- rulesSampling: newRulesSampler(c.traceRules, c.spanRules),
- prioritySampling: sampler,
- pid: os.Getpid(),
- stats: newConcentrator(c, defaultStatsBucketSize),
- obfuscator: obfuscate.NewObfuscator(obfuscate.Config{
- SQL: obfuscate.SQLConfig{
- TableNames: c.agent.HasFlag("table_names"),
- ReplaceDigits: c.agent.HasFlag("quantize_sql_tables") || c.agent.HasFlag("replace_sql_digits"),
- KeepSQLAlias: c.agent.HasFlag("keep_sql_alias"),
- DollarQuotedFunc: c.agent.HasFlag("dollar_quoted_func"),
- Cache: c.agent.HasFlag("sql_cache"),
- },
- }),
- statsd: statsd,
- }
- return t
- }
- func newTracer(opts ...StartOption) *tracer {
- t := newUnstartedTracer(opts...)
- c := t.config
- t.statsd.Incr("datadog.tracer.started", nil, 1)
- if c.runtimeMetrics {
- log.Debug("Runtime metrics enabled.")
- t.wg.Add(1)
- go func() {
- defer t.wg.Done()
- t.reportRuntimeMetrics(defaultMetricsReportInterval)
- }()
- }
- t.wg.Add(1)
- go func() {
- defer t.wg.Done()
- tick := t.config.tickChan
- if tick == nil {
- ticker := time.NewTicker(flushInterval)
- defer ticker.Stop()
- tick = ticker.C
- }
- t.worker(tick)
- }()
- t.wg.Add(1)
- go func() {
- defer t.wg.Done()
- t.reportHealthMetrics(statsInterval)
- }()
- t.stats.Start()
- return t
- }
- // Flush flushes any buffered traces. Flush is in effect only if a tracer
- // is started. Users do not have to call Flush in order to ensure that
- // traces reach Datadog. It is a convenience method dedicated to a specific
- // use case described below.
- //
- // Flush is of use in Lambda environments, where starting and stopping
- // the tracer on each invokation may create too much latency. In this
- // scenario, a tracer may be started and stopped by the parent process
- // whereas the invokation can make use of Flush to ensure any created spans
- // reach the agent.
- func Flush() {
- if t, ok := internal.GetGlobalTracer().(*tracer); ok {
- t.flushSync()
- }
- }
- // flushSync triggers a flush and waits for it to complete.
- func (t *tracer) flushSync() {
- done := make(chan struct{})
- t.flush <- done
- <-done
- }
- // worker receives finished traces to be added into the payload, as well
- // as periodically flushes traces to the transport.
- func (t *tracer) worker(tick <-chan time.Time) {
- for {
- select {
- case trace := <-t.out:
- t.sampleFinishedTrace(trace)
- if len(trace.spans) != 0 {
- t.traceWriter.add(trace.spans)
- }
- case <-tick:
- t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1)
- t.traceWriter.flush()
- case done := <-t.flush:
- t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:invoked"}, 1)
- t.traceWriter.flush()
- t.statsd.Flush()
- t.stats.flushAndSend(time.Now(), withCurrentBucket)
- // TODO(x): In reality, the traceWriter.flush() call is not synchronous
- // when using the agent traceWriter. However, this functionnality is used
- // in Lambda so for that purpose this mechanism should suffice.
- done <- struct{}{}
- case <-t.stop:
- loop:
- // the loop ensures that the payload channel is fully drained
- // before the final flush to ensure no traces are lost (see #526)
- for {
- select {
- case trace := <-t.out:
- t.sampleFinishedTrace(trace)
- if len(trace.spans) != 0 {
- t.traceWriter.add(trace.spans)
- }
- default:
- break loop
- }
- }
- return
- }
- }
- }
- // finishedTrace holds information about a trace that has finished, including its spans.
- type finishedTrace struct {
- spans []*span
- willSend bool // willSend indicates whether the trace will be sent to the agent.
- }
- // sampleFinishedTrace applies single-span sampling to the provided trace, which is considered to be finished.
- func (t *tracer) sampleFinishedTrace(info *finishedTrace) {
- if len(info.spans) > 0 {
- if p, ok := info.spans[0].context.samplingPriority(); ok && p > 0 {
- // The trace is kept, no need to run single span sampling rules.
- return
- }
- }
- var kept []*span
- if t.rulesSampling.HasSpanRules() {
- // Apply sampling rules to individual spans in the trace.
- for _, span := range info.spans {
- if t.rulesSampling.SampleSpan(span) {
- kept = append(kept, span)
- }
- }
- if len(kept) > 0 && len(kept) < len(info.spans) {
- // Some spans in the trace were kept, so a partial trace will be sent.
- atomic.AddUint32(&t.partialTraces, 1)
- }
- }
- if len(kept) == 0 {
- atomic.AddUint32(&t.droppedP0Traces, 1)
- }
- atomic.AddUint32(&t.droppedP0Spans, uint32(len(info.spans)-len(kept)))
- if !info.willSend {
- info.spans = kept
- }
- }
- func (t *tracer) pushTrace(trace *finishedTrace) {
- select {
- case <-t.stop:
- return
- default:
- }
- select {
- case t.out <- trace:
- default:
- log.Error("payload queue full, dropping %d traces", len(trace.spans))
- }
- }
- // StartSpan creates, starts, and returns a new Span with the given `operationName`.
- func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOption) ddtrace.Span {
- var opts ddtrace.StartSpanConfig
- for _, fn := range options {
- fn(&opts)
- }
- var startTime int64
- if opts.StartTime.IsZero() {
- startTime = now()
- } else {
- startTime = opts.StartTime.UnixNano()
- }
- var context *spanContext
- // The default pprof context is taken from the start options and is
- // not nil when using StartSpanFromContext()
- pprofContext := opts.Context
- if opts.Parent != nil {
- if ctx, ok := opts.Parent.(*spanContext); ok {
- context = ctx
- if pprofContext == nil && ctx.span != nil {
- // Inherit the context.Context from parent span if it was propagated
- // using ChildOf() rather than StartSpanFromContext(), see
- // applyPPROFLabels() below.
- pprofContext = ctx.span.pprofCtxActive
- }
- }
- }
- if pprofContext == nil {
- // For root span's without context, there is no pprofContext, but we need
- // one to avoid a panic() in pprof.WithLabels(). Using context.Background()
- // is not ideal here, as it will cause us to remove all labels from the
- // goroutine when the span finishes. However, the alternatives of not
- // applying labels for such spans or to leave the endpoint/hotspot labels
- // on the goroutine after it finishes are even less appealing. We'll have
- // to properly document this for users.
- pprofContext = gocontext.Background()
- }
- id := opts.SpanID
- if id == 0 {
- id = generateSpanID(startTime)
- }
- // span defaults
- span := &span{
- Name: operationName,
- Service: t.config.serviceName,
- Resource: operationName,
- SpanID: id,
- TraceID: id,
- Start: startTime,
- noDebugStack: t.config.noDebugStack,
- }
- if t.config.hostname != "" {
- span.setMeta(keyHostname, t.config.hostname)
- }
- if context != nil {
- // this is a child span
- span.TraceID = context.traceID
- span.ParentID = context.spanID
- if p, ok := context.samplingPriority(); ok {
- span.setMetric(keySamplingPriority, float64(p))
- }
- if context.span != nil {
- // local parent, inherit service
- context.span.RLock()
- span.Service = context.span.Service
- context.span.RUnlock()
- } else {
- // remote parent
- if context.origin != "" {
- // mark origin
- span.setMeta(keyOrigin, context.origin)
- }
- }
- }
- span.context = newSpanContext(span, context)
- span.setMetric(ext.Pid, float64(t.pid))
- span.setMeta("language", "go")
- // add tags from options
- for k, v := range opts.Tags {
- span.SetTag(k, v)
- }
- // add global tags
- for k, v := range t.config.globalTags {
- span.SetTag(k, v)
- }
- if t.config.serviceMappings != nil {
- if newSvc, ok := t.config.serviceMappings[span.Service]; ok {
- span.Service = newSvc
- }
- }
- if context == nil || context.span == nil || context.span.Service != span.Service {
- span.setMetric(keyTopLevel, 1)
- // all top level spans are measured. So the measured tag is redundant.
- delete(span.Metrics, keyMeasured)
- }
- if t.config.version != "" {
- if t.config.universalVersion || (!t.config.universalVersion && span.Service == t.config.serviceName) {
- span.setMeta(ext.Version, t.config.version)
- }
- }
- if t.config.env != "" {
- span.setMeta(ext.Environment, t.config.env)
- }
- if _, ok := span.context.samplingPriority(); !ok {
- // if not already sampled or a brand new trace, sample it
- t.sample(span)
- }
- pprofContext, span.taskEnd = startExecutionTracerTask(pprofContext, span)
- if t.config.profilerHotspots || t.config.profilerEndpoints {
- t.applyPPROFLabels(pprofContext, span)
- }
- if t.config.serviceMappings != nil {
- if newSvc, ok := t.config.serviceMappings[span.Service]; ok {
- span.Service = newSvc
- }
- }
- if log.DebugEnabled() {
- // avoid allocating the ...interface{} argument if debug logging is disabled
- log.Debug("Started Span: %v, Operation: %s, Resource: %s, Tags: %v, %v",
- span, span.Name, span.Resource, span.Meta, span.Metrics)
- }
- return span
- }
- // generateSpanID returns a random uint64 that has been XORd with the startTime.
- // This is done to get around the 32-bit random seed limitation that may create collisions if there is a large number
- // of go services all generating spans.
- func generateSpanID(startTime int64) uint64 {
- return random.Uint64() ^ uint64(startTime)
- }
- // applyPPROFLabels applies pprof labels for the profiler's code hotspots and
- // endpoint filtering feature to span. When span finishes, any pprof labels
- // found in ctx are restored. Additionally this func informs the profiler how
- // many times each endpoint is called.
- func (t *tracer) applyPPROFLabels(ctx gocontext.Context, span *span) {
- var labels []string
- if t.config.profilerHotspots {
- // allocate the max-length slice to avoid growing it later
- labels = make([]string, 0, 6)
- labels = append(labels, traceprof.SpanID, strconv.FormatUint(span.SpanID, 10))
- }
- // nil checks might not be needed, but better be safe than sorry
- if localRootSpan := span.root(); localRootSpan != nil {
- if t.config.profilerHotspots {
- labels = append(labels, traceprof.LocalRootSpanID, strconv.FormatUint(localRootSpan.SpanID, 10))
- }
- if t.config.profilerEndpoints && spanResourcePIISafe(localRootSpan) {
- labels = append(labels, traceprof.TraceEndpoint, localRootSpan.Resource)
- if span == localRootSpan {
- // Inform the profiler of endpoint hits. This is used for the unit of
- // work feature. We can't use APM stats for this since the stats don't
- // have enough cardinality (e.g. runtime-id tags are missing).
- traceprof.GlobalEndpointCounter().Inc(localRootSpan.Resource)
- }
- }
- }
- if len(labels) > 0 {
- span.pprofCtxRestore = ctx
- span.pprofCtxActive = pprof.WithLabels(ctx, pprof.Labels(labels...))
- pprof.SetGoroutineLabels(span.pprofCtxActive)
- }
- }
- // spanResourcePIISafe returns true if s.Resource can be considered to not
- // include PII with reasonable confidence. E.g. SQL queries may contain PII,
- // but http, rpc or custom (s.Type == "") span resource names generally do not.
- func spanResourcePIISafe(s *span) bool {
- return s.Type == ext.SpanTypeWeb || s.Type == ext.AppTypeRPC || s.Type == ""
- }
- // Stop stops the tracer.
- func (t *tracer) Stop() {
- t.stopOnce.Do(func() {
- close(t.stop)
- t.statsd.Incr("datadog.tracer.stopped", nil, 1)
- })
- t.stats.Stop()
- t.wg.Wait()
- t.traceWriter.stop()
- t.statsd.Close()
- appsec.Stop()
- }
- // Inject uses the configured or default TextMap Propagator.
- func (t *tracer) Inject(ctx ddtrace.SpanContext, carrier interface{}) error {
- return t.config.propagator.Inject(ctx, carrier)
- }
- // Extract uses the configured or default TextMap Propagator.
- func (t *tracer) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
- return t.config.propagator.Extract(carrier)
- }
- // sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent.
- const sampleRateMetricKey = "_sample_rate"
- // Sample samples a span with the internal sampler.
- func (t *tracer) sample(span *span) {
- if _, ok := span.context.samplingPriority(); ok {
- // sampling decision was already made
- return
- }
- sampler := t.config.sampler
- if !sampler.Sample(span) {
- span.context.trace.drop()
- return
- }
- if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
- span.setMetric(sampleRateMetricKey, rs.Rate())
- }
- if t.rulesSampling.SampleTrace(span) {
- return
- }
- t.prioritySampling.apply(span)
- }
- func startExecutionTracerTask(ctx gocontext.Context, span *span) (gocontext.Context, func()) {
- if !rt.IsEnabled() {
- return ctx, func() {}
- }
- // Task name is the resource (operationName) of the span, e.g.
- // "POST /foo/bar" (http) or "/foo/pkg.Method" (grpc).
- taskName := span.Resource
- // If the resource could contain PII (e.g. SQL query that's not using bind
- // arguments), play it safe and just use the span type as the taskName,
- // e.g. "sql".
- if !spanResourcePIISafe(span) {
- taskName = span.Type
- }
- ctx, task := rt.NewTask(ctx, taskName)
- rt.Log(ctx, "span id", strconv.FormatUint(span.SpanID, 10))
- return ctx, task.End
- }
|