| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2016 Datadog, Inc.
- //go:generate msgp -unexported -marshal=false -o=span_msgp.go -tests=false
- package tracer
- import (
- "context"
- "encoding/base64"
- "fmt"
- "math"
- "os"
- "reflect"
- "runtime"
- "runtime/pprof"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/traceprof"
- "github.com/DataDog/datadog-agent/pkg/obfuscate"
- "github.com/tinylib/msgp/msgp"
- "golang.org/x/xerrors"
- )
- type (
- // spanList implements msgp.Encodable on top of a slice of spans.
- spanList []*span
- // spanLists implements msgp.Decodable on top of a slice of spanList.
- // This type is only used in tests.
- spanLists []spanList
- )
- var (
- _ ddtrace.Span = (*span)(nil)
- _ msgp.Encodable = (*spanList)(nil)
- _ msgp.Decodable = (*spanLists)(nil)
- )
- // errorConfig holds customization options for setting error tags.
- type errorConfig struct {
- noDebugStack bool
- stackFrames uint
- stackSkip uint
- }
- // span represents a computation. Callers must call Finish when a span is
- // complete to ensure it's submitted.
- type span struct {
- sync.RWMutex `msg:"-"` // all fields are protected by this RWMutex
- Name string `msg:"name"` // operation name
- Service string `msg:"service"` // service name (i.e. "grpc.server", "http.request")
- Resource string `msg:"resource"` // resource name (i.e. "/user?id=123", "SELECT * FROM users")
- Type string `msg:"type"` // protocol associated with the span (i.e. "web", "db", "cache")
- Start int64 `msg:"start"` // span start time expressed in nanoseconds since epoch
- Duration int64 `msg:"duration"` // duration of the span expressed in nanoseconds
- Meta map[string]string `msg:"meta,omitempty"` // arbitrary map of metadata
- Metrics map[string]float64 `msg:"metrics,omitempty"` // arbitrary map of numeric metrics
- SpanID uint64 `msg:"span_id"` // identifier of this span
- TraceID uint64 `msg:"trace_id"` // identifier of the root span
- ParentID uint64 `msg:"parent_id"` // identifier of the span's direct parent
- Error int32 `msg:"error"` // error status of the span; 0 means no errors
- noDebugStack bool `msg:"-"` // disables debug stack traces
- finished bool `msg:"-"` // true if the span has been submitted to a tracer.
- context *spanContext `msg:"-"` // span propagation context
- pprofCtxActive context.Context `msg:"-"` // contains pprof.WithLabel labels to tell the profiler more about this span
- pprofCtxRestore context.Context `msg:"-"` // contains pprof.WithLabel labels of the parent span (if any) that need to be restored when this span finishes
- taskEnd func() // ends execution tracer (runtime/trace) task, if started
- }
- // Context yields the SpanContext for this Span. Note that the return
- // value of Context() is still valid after a call to Finish(). This is
- // called the span context and it is different from Go's context.
- func (s *span) Context() ddtrace.SpanContext { return s.context }
- // SetBaggageItem sets a key/value pair as baggage on the span. Baggage items
- // are propagated down to descendant spans and injected cross-process. Use with
- // care as it adds extra load onto your tracing layer.
- func (s *span) SetBaggageItem(key, val string) {
- s.context.setBaggageItem(key, val)
- }
- // BaggageItem gets the value for a baggage item given its key. Returns the
- // empty string if the value isn't found in this Span.
- func (s *span) BaggageItem(key string) string {
- return s.context.baggageItem(key)
- }
- // SetTag adds a set of key/value metadata to the span.
- func (s *span) SetTag(key string, value interface{}) {
- s.Lock()
- defer s.Unlock()
- // We don't lock spans when flushing, so we could have a data race when
- // modifying a span as it's being flushed. This protects us against that
- // race, since spans are marked `finished` before we flush them.
- if s.finished {
- return
- }
- switch key {
- case ext.Error:
- s.setTagError(value, errorConfig{
- noDebugStack: s.noDebugStack,
- })
- return
- }
- if v, ok := value.(bool); ok {
- s.setTagBool(key, v)
- return
- }
- if v, ok := value.(string); ok {
- if key == ext.ResourceName && s.pprofCtxActive != nil && spanResourcePIISafe(s) {
- // If the user overrides the resource name for the span,
- // update the endpoint label for the runtime profilers.
- //
- // We don't change s.pprofCtxRestore since that should
- // stay as the original parent span context regardless
- // of what we change at a lower level.
- s.pprofCtxActive = pprof.WithLabels(s.pprofCtxActive, pprof.Labels(traceprof.TraceEndpoint, v))
- pprof.SetGoroutineLabels(s.pprofCtxActive)
- }
- s.setMeta(key, v)
- return
- }
- if v, ok := toFloat64(value); ok {
- s.setMetric(key, v)
- return
- }
- if v, ok := value.(fmt.Stringer); ok {
- defer func() {
- if e := recover(); e != nil {
- if v := reflect.ValueOf(value); v.Kind() == reflect.Ptr && v.IsNil() {
- // If .String() panics due to a nil receiver, we want to catch this
- // and replace the string value with "<nil>", just as Sprintf does.
- // Other panics should not be handled.
- s.setMeta(key, "<nil>")
- return
- }
- panic(e)
- }
- }()
- s.setMeta(key, v.String())
- return
- }
- // not numeric, not a string, not a fmt.Stringer, not a bool, and not an error
- s.setMeta(key, fmt.Sprint(value))
- }
- // setSamplingPriority locks then span, then updates the sampling priority.
- // It also updates the trace's sampling priority.
- func (s *span) setSamplingPriority(priority int, sampler samplernames.SamplerName) {
- s.Lock()
- defer s.Unlock()
- s.setSamplingPriorityLocked(priority, sampler)
- }
- // Root returns the root span of the span's trace. The return value shouldn't be
- // nil as long as the root span is valid and not finished.
- func (s *span) Root() Span {
- return s.root()
- }
- // root returns the root span of the span's trace. The return value shouldn't be
- // nil as long as the root span is valid and not finished.
- // As opposed to the public Root method, this one returns the actual span type
- // when internal usage requires it (to avoid type assertions from Root's return
- // value).
- func (s *span) root() *span {
- if s == nil || s.context == nil {
- return nil
- }
- if s.context.trace == nil {
- return nil
- }
- return s.context.trace.root
- }
- // SetUser associates user information to the current trace which the
- // provided span belongs to. The options can be used to tune which user
- // bit of information gets monitored. In case of distributed traces,
- // the user id can be propagated across traces using the WithPropagation() option.
- // See https://docs.datadoghq.com/security_platform/application_security/setup_and_configure/?tab=set_user#add-user-information-to-traces
- func (s *span) SetUser(id string, opts ...UserMonitoringOption) {
- var cfg UserMonitoringConfig
- for _, fn := range opts {
- fn(&cfg)
- }
- root := s.root()
- trace := root.context.trace
- root.Lock()
- defer root.Unlock()
- if cfg.PropagateID {
- // Delete usr.id from the tags since _dd.p.usr.id takes precedence
- delete(root.Meta, keyUserID)
- idenc := base64.StdEncoding.EncodeToString([]byte(id))
- trace.setPropagatingTag(keyPropagatedUserID, idenc)
- s.context.updated = true
- } else {
- // Unset the propagated user ID so that a propagated user ID coming from upstream won't be propagated anymore.
- trace.unsetPropagatingTag(keyPropagatedUserID)
- if _, ok := trace.propagatingTags[keyPropagatedUserID]; ok {
- s.context.updated = true
- }
- delete(root.Meta, keyPropagatedUserID)
- // setMeta is used since the span is already locked
- root.setMeta(keyUserID, id)
- }
- for k, v := range map[string]string{
- keyUserEmail: cfg.Email,
- keyUserName: cfg.Name,
- keyUserScope: cfg.Scope,
- keyUserRole: cfg.Role,
- keyUserSessionID: cfg.SessionID,
- } {
- if v != "" {
- root.setMeta(k, v)
- }
- }
- }
- // setSamplingPriorityLocked updates the sampling priority.
- // It also updates the trace's sampling priority.
- func (s *span) setSamplingPriorityLocked(priority int, sampler samplernames.SamplerName) {
- // We don't lock spans when flushing, so we could have a data race when
- // modifying a span as it's being flushed. This protects us against that
- // race, since spans are marked `finished` before we flush them.
- if s.finished {
- return
- }
- s.setMetric(keySamplingPriority, float64(priority))
- s.context.setSamplingPriority(priority, sampler)
- }
- // setTagError sets the error tag. It accounts for various valid scenarios.
- // This method is not safe for concurrent use.
- func (s *span) setTagError(value interface{}, cfg errorConfig) {
- setError := func(yes bool) {
- if yes {
- if s.Error == 0 {
- // new error
- atomic.AddInt32(&s.context.errors, 1)
- }
- s.Error = 1
- } else {
- if s.Error > 0 {
- // flip from active to inactive
- atomic.AddInt32(&s.context.errors, -1)
- }
- s.Error = 0
- }
- }
- if s.finished {
- return
- }
- switch v := value.(type) {
- case bool:
- // bool value as per Opentracing spec.
- setError(v)
- case error:
- // if anyone sets an error value as the tag, be nice here
- // and provide all the benefits.
- setError(true)
- s.setMeta(ext.ErrorMsg, v.Error())
- s.setMeta(ext.ErrorType, reflect.TypeOf(v).String())
- if !cfg.noDebugStack {
- s.setMeta(ext.ErrorStack, takeStacktrace(cfg.stackFrames, cfg.stackSkip))
- }
- switch v.(type) {
- case xerrors.Formatter:
- s.setMeta(ext.ErrorDetails, fmt.Sprintf("%+v", v))
- case fmt.Formatter:
- // pkg/errors approach
- s.setMeta(ext.ErrorDetails, fmt.Sprintf("%+v", v))
- }
- case nil:
- // no error
- setError(false)
- default:
- // in all other cases, let's assume that setting this tag
- // is the result of an error.
- setError(true)
- }
- }
- // defaultStackLength specifies the default maximum size of a stack trace.
- const defaultStackLength = 32
- // takeStacktrace takes a stack trace of maximum n entries, skipping the first skip entries.
- // If n is 0, up to 20 entries are retrieved.
- func takeStacktrace(n, skip uint) string {
- if n == 0 {
- n = defaultStackLength
- }
- var builder strings.Builder
- pcs := make([]uintptr, n)
- // +2 to exclude runtime.Callers and takeStacktrace
- numFrames := runtime.Callers(2+int(skip), pcs)
- if numFrames == 0 {
- return ""
- }
- frames := runtime.CallersFrames(pcs[:numFrames])
- for i := 0; ; i++ {
- frame, more := frames.Next()
- if i != 0 {
- builder.WriteByte('\n')
- }
- builder.WriteString(frame.Function)
- builder.WriteByte('\n')
- builder.WriteByte('\t')
- builder.WriteString(frame.File)
- builder.WriteByte(':')
- builder.WriteString(strconv.Itoa(frame.Line))
- if !more {
- break
- }
- }
- return builder.String()
- }
- // setMeta sets a string tag. This method is not safe for concurrent use.
- func (s *span) setMeta(key, v string) {
- if s.Meta == nil {
- s.Meta = make(map[string]string, 1)
- }
- delete(s.Metrics, key)
- switch key {
- case ext.SpanName:
- s.Name = v
- case ext.ServiceName:
- s.Service = v
- case ext.ResourceName:
- s.Resource = v
- case ext.SpanType:
- s.Type = v
- default:
- s.Meta[key] = v
- }
- }
- // setTagBool sets a boolean tag on the span.
- func (s *span) setTagBool(key string, v bool) {
- switch key {
- case ext.AnalyticsEvent:
- if v {
- s.setMetric(ext.EventSampleRate, 1.0)
- } else {
- s.setMetric(ext.EventSampleRate, 0.0)
- }
- case ext.ManualDrop:
- if v {
- s.setSamplingPriorityLocked(ext.PriorityUserReject, samplernames.Manual)
- }
- case ext.ManualKeep:
- if v {
- s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.Manual)
- }
- default:
- if v {
- s.setMeta(key, "true")
- } else {
- s.setMeta(key, "false")
- }
- }
- }
- // setMetric sets a numeric tag, in our case called a metric. This method
- // is not safe for concurrent use.
- func (s *span) setMetric(key string, v float64) {
- if s.Metrics == nil {
- s.Metrics = make(map[string]float64, 1)
- }
- delete(s.Meta, key)
- switch key {
- case ext.ManualKeep:
- if v == float64(samplernames.AppSec) {
- s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.AppSec)
- }
- case ext.SamplingPriority:
- // ext.SamplingPriority is deprecated in favor of ext.ManualKeep and ext.ManualDrop.
- // We have it here for backward compatibility.
- s.setSamplingPriorityLocked(int(v), samplernames.Manual)
- default:
- s.Metrics[key] = v
- }
- }
- // Finish closes this Span (but not its children) providing the duration
- // of its part of the tracing session.
- func (s *span) Finish(opts ...ddtrace.FinishOption) {
- t := now()
- if len(opts) > 0 {
- cfg := ddtrace.FinishConfig{
- NoDebugStack: s.noDebugStack,
- }
- for _, fn := range opts {
- fn(&cfg)
- }
- if !cfg.FinishTime.IsZero() {
- t = cfg.FinishTime.UnixNano()
- }
- if cfg.Error != nil {
- s.Lock()
- s.setTagError(cfg.Error, errorConfig{
- noDebugStack: cfg.NoDebugStack,
- stackFrames: cfg.StackFrames,
- stackSkip: cfg.SkipStackFrames,
- })
- s.Unlock()
- }
- }
- if s.taskEnd != nil {
- s.taskEnd()
- }
- s.finish(t)
- if s.pprofCtxRestore != nil {
- // Restore the labels of the parent span so any CPU samples after this
- // point are attributed correctly.
- pprof.SetGoroutineLabels(s.pprofCtxRestore)
- }
- }
- // SetOperationName sets or changes the operation name.
- func (s *span) SetOperationName(operationName string) {
- s.Lock()
- defer s.Unlock()
- // We don't lock spans when flushing, so we could have a data race when
- // modifying a span as it's being flushed. This protects us against that
- // race, since spans are marked `finished` before we flush them.
- if s.finished {
- // already finished
- return
- }
- s.Name = operationName
- }
- func (s *span) finish(finishTime int64) {
- s.Lock()
- defer s.Unlock()
- // We don't lock spans when flushing, so we could have a data race when
- // modifying a span as it's being flushed. This protects us against that
- // race, since spans are marked `finished` before we flush them.
- if s.finished {
- // already finished
- return
- }
- if s.Duration == 0 {
- s.Duration = finishTime - s.Start
- }
- if s.Duration < 0 {
- s.Duration = 0
- }
- s.finished = true
- keep := true
- if t, ok := internal.GetGlobalTracer().(*tracer); ok {
- // we have an active tracer
- if t.config.canComputeStats() && shouldComputeStats(s) {
- // the agent supports computed stats
- select {
- case t.stats.In <- newAggregableSpan(s, t.obfuscator):
- // ok
- default:
- log.Error("Stats channel full, disregarding span.")
- }
- }
- if t.config.canDropP0s() {
- // the agent supports dropping p0's in the client
- keep = shouldKeep(s)
- }
- }
- if keep {
- // a single kept span keeps the whole trace.
- s.context.trace.keep()
- }
- s.context.finish()
- }
- // newAggregableSpan creates a new summary for the span s, within an application
- // version version.
- func newAggregableSpan(s *span, obfuscator *obfuscate.Obfuscator) *aggregableSpan {
- var statusCode uint32
- if sc, ok := s.Meta["http.status_code"]; ok && sc != "" {
- if c, err := strconv.Atoi(sc); err == nil && c > 0 && c <= math.MaxInt32 {
- statusCode = uint32(c)
- }
- }
- key := aggregation{
- Name: s.Name,
- Resource: obfuscatedResource(obfuscator, s.Type, s.Resource),
- Service: s.Service,
- Type: s.Type,
- Synthetics: strings.HasPrefix(s.Meta[keyOrigin], "synthetics"),
- StatusCode: statusCode,
- }
- return &aggregableSpan{
- key: key,
- Start: s.Start,
- Duration: s.Duration,
- TopLevel: s.Metrics[keyTopLevel] == 1,
- Error: s.Error,
- }
- }
- // textNonParsable specifies the text that will be assigned to resources for which the resource
- // can not be parsed due to an obfuscation error.
- const textNonParsable = "Non-parsable SQL query"
- // obfuscatedResource returns the obfuscated version of the given resource. It is
- // obfuscated using the given obfuscator for the given span type typ.
- func obfuscatedResource(o *obfuscate.Obfuscator, typ, resource string) string {
- if o == nil {
- return resource
- }
- switch typ {
- case "sql", "cassandra":
- oq, err := o.ObfuscateSQLString(resource)
- if err != nil {
- log.Error("Error obfuscating stats group resource %q: %v", resource, err)
- return textNonParsable
- }
- return oq.Query
- case "redis":
- return o.QuantizeRedisString(resource)
- default:
- return resource
- }
- }
- // shouldKeep reports whether the trace should be kept.
- // a single span being kept implies the whole trace being kept.
- func shouldKeep(s *span) bool {
- if p, ok := s.context.samplingPriority(); ok && p > 0 {
- // positive sampling priorities stay
- return true
- }
- if atomic.LoadInt32(&s.context.errors) > 0 {
- // traces with any span containing an error get kept
- return true
- }
- if v, ok := s.Metrics[ext.EventSampleRate]; ok {
- return sampledByRate(s.TraceID, v)
- }
- return false
- }
- // shouldComputeStats mentions whether this span needs to have stats computed for.
- // Warning: callers must guard!
- func shouldComputeStats(s *span) bool {
- if v, ok := s.Metrics[keyMeasured]; ok && v == 1 {
- return true
- }
- if v, ok := s.Metrics[keyTopLevel]; ok && v == 1 {
- return true
- }
- return false
- }
- // String returns a human readable representation of the span. Not for
- // production, just debugging.
- func (s *span) String() string {
- s.RLock()
- defer s.RUnlock()
- lines := []string{
- fmt.Sprintf("Name: %s", s.Name),
- fmt.Sprintf("Service: %s", s.Service),
- fmt.Sprintf("Resource: %s", s.Resource),
- fmt.Sprintf("TraceID: %d", s.TraceID),
- fmt.Sprintf("SpanID: %d", s.SpanID),
- fmt.Sprintf("ParentID: %d", s.ParentID),
- fmt.Sprintf("Start: %s", time.Unix(0, s.Start)),
- fmt.Sprintf("Duration: %s", time.Duration(s.Duration)),
- fmt.Sprintf("Error: %d", s.Error),
- fmt.Sprintf("Type: %s", s.Type),
- "Tags:",
- }
- for key, val := range s.Meta {
- lines = append(lines, fmt.Sprintf("\t%s:%s", key, val))
- }
- for key, val := range s.Metrics {
- lines = append(lines, fmt.Sprintf("\t%s:%f", key, val))
- }
- return strings.Join(lines, "\n")
- }
- // Format implements fmt.Formatter.
- func (s *span) Format(f fmt.State, c rune) {
- switch c {
- case 's':
- fmt.Fprint(f, s.String())
- case 'v':
- if svc := globalconfig.ServiceName(); svc != "" {
- fmt.Fprintf(f, "dd.service=%s ", svc)
- }
- if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
- if tr.config.env != "" {
- fmt.Fprintf(f, "dd.env=%s ", tr.config.env)
- }
- if tr.config.version != "" {
- fmt.Fprintf(f, "dd.version=%s ", tr.config.version)
- }
- } else {
- if env := os.Getenv("DD_ENV"); env != "" {
- fmt.Fprintf(f, "dd.env=%s ", env)
- }
- if v := os.Getenv("DD_VERSION"); v != "" {
- fmt.Fprintf(f, "dd.version=%s ", v)
- }
- }
- fmt.Fprintf(f, `dd.trace_id="%d" dd.span_id="%d"`, s.TraceID, s.SpanID)
- default:
- fmt.Fprintf(f, "%%!%c(ddtrace.Span=%v)", c, s)
- }
- }
- const (
- keySamplingPriority = "_sampling_priority_v1"
- keySamplingPriorityRate = "_dd.agent_psr"
- keyDecisionMaker = "_dd.p.dm"
- keyServiceHash = "_dd.dm.service_hash"
- keyOrigin = "_dd.origin"
- keyHostname = "_dd.hostname"
- keyRulesSamplerAppliedRate = "_dd.rule_psr"
- keyRulesSamplerLimiterRate = "_dd.limit_psr"
- keyMeasured = "_dd.measured"
- // keyTopLevel is the key of top level metric indicating if a span is top level.
- // A top level span is a local root (parent span of the local trace) or the first span of each service.
- keyTopLevel = "_dd.top_level"
- // keyPropagationError holds any error from propagated trace tags (if any)
- keyPropagationError = "_dd.propagation_error"
- // keySpanSamplingMechanism specifies the sampling mechanism by which an individual span was sampled
- keySpanSamplingMechanism = "_dd.span_sampling.mechanism"
- // keySingleSpanSamplingRuleRate specifies the configured sampling probability for the single span sampling rule.
- keySingleSpanSamplingRuleRate = "_dd.span_sampling.rule_rate"
- // keySingleSpanSamplingMPS specifies the configured limit for the single span sampling rule
- // that the span matched. If there is no configured limit, then this tag is omitted.
- keySingleSpanSamplingMPS = "_dd.span_sampling.max_per_second"
- // keyPropagatedUserID holds the propagated user identifier, if user id propagation is enabled.
- keyPropagatedUserID = "_dd.p.usr.id"
- )
- // The following set of tags is used for user monitoring and set through calls to span.SetUser().
- const (
- keyUserID = "usr.id"
- keyUserEmail = "usr.email"
- keyUserName = "usr.name"
- keyUserRole = "usr.role"
- keyUserScope = "usr.scope"
- keyUserSessionID = "usr.session_id"
- )
- const (
- // samplingMechanismSingleSpan specifies value reserved to indicate that a span was kept
- // on account of a single span sampling rule.
- samplingMechanismSingleSpan = 8
- )
|