spancontext.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. package tracer
  6. import (
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
  11. "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
  12. "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
  13. "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
  14. )
  15. var _ ddtrace.SpanContext = (*spanContext)(nil)
  16. // SpanContext represents a span state that can propagate to descendant spans
  17. // and across process boundaries. It contains all the information needed to
  18. // spawn a direct descendant of the span that it belongs to. It can be used
  19. // to create distributed tracing by propagating it using the provided interfaces.
  20. type spanContext struct {
  21. updated bool // updated is tracking changes for priority / origin / x-datadog-tags
  22. // the below group should propagate only locally
  23. trace *trace // reference to the trace that this span belongs too
  24. span *span // reference to the span that hosts this context
  25. errors int32 // number of spans with errors in this trace
  26. // the below group should propagate cross-process
  27. traceID uint64
  28. spanID uint64
  29. mu sync.RWMutex // guards below fields
  30. baggage map[string]string
  31. hasBaggage uint32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
  32. origin string // e.g. "synthetics"
  33. }
  34. // newSpanContext creates a new SpanContext to serve as context for the given
  35. // span. If the provided parent is not nil, the context will inherit the trace,
  36. // baggage and other values from it. This method also pushes the span into the
  37. // new context's trace and as a result, it should not be called multiple times
  38. // for the same span.
  39. func newSpanContext(span *span, parent *spanContext) *spanContext {
  40. context := &spanContext{
  41. traceID: span.TraceID,
  42. spanID: span.SpanID,
  43. span: span,
  44. }
  45. if parent != nil {
  46. context.trace = parent.trace
  47. context.origin = parent.origin
  48. context.errors = parent.errors
  49. parent.ForeachBaggageItem(func(k, v string) bool {
  50. context.setBaggageItem(k, v)
  51. return true
  52. })
  53. }
  54. if context.trace == nil {
  55. context.trace = newTrace()
  56. }
  57. if context.trace.root == nil {
  58. // first span in the trace can safely be assumed to be the root
  59. context.trace.root = span
  60. }
  61. // put span in context's trace
  62. context.trace.push(span)
  63. // setting context.updated to false here is necessary to distinguish
  64. // between initializing properties of the span (priority)
  65. // and updating them after extracting context through propagators
  66. context.updated = false
  67. return context
  68. }
  69. // SpanID implements ddtrace.SpanContext.
  70. func (c *spanContext) SpanID() uint64 { return c.spanID }
  71. // TraceID implements ddtrace.SpanContext.
  72. func (c *spanContext) TraceID() uint64 { return c.traceID }
  73. // ForeachBaggageItem implements ddtrace.SpanContext.
  74. func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
  75. if atomic.LoadUint32(&c.hasBaggage) == 0 {
  76. return
  77. }
  78. c.mu.RLock()
  79. defer c.mu.RUnlock()
  80. for k, v := range c.baggage {
  81. if !handler(k, v) {
  82. break
  83. }
  84. }
  85. }
  86. func (c *spanContext) setSamplingPriority(p int, sampler samplernames.SamplerName) {
  87. if c.trace == nil {
  88. c.trace = newTrace()
  89. }
  90. if c.trace.priority != nil && *c.trace.priority != float64(p) {
  91. c.updated = true
  92. }
  93. c.trace.setSamplingPriority(p, sampler)
  94. }
  95. func (c *spanContext) samplingPriority() (p int, ok bool) {
  96. if c.trace == nil {
  97. return 0, false
  98. }
  99. return c.trace.samplingPriority()
  100. }
  101. func (c *spanContext) setBaggageItem(key, val string) {
  102. c.mu.Lock()
  103. defer c.mu.Unlock()
  104. if c.baggage == nil {
  105. atomic.StoreUint32(&c.hasBaggage, 1)
  106. c.baggage = make(map[string]string, 1)
  107. }
  108. c.baggage[key] = val
  109. }
  110. func (c *spanContext) baggageItem(key string) string {
  111. if atomic.LoadUint32(&c.hasBaggage) == 0 {
  112. return ""
  113. }
  114. c.mu.RLock()
  115. defer c.mu.RUnlock()
  116. return c.baggage[key]
  117. }
  118. func (c *spanContext) meta(key string) (val string, ok bool) {
  119. c.span.RLock()
  120. defer c.span.RUnlock()
  121. val, ok = c.span.Meta[key]
  122. return val, ok
  123. }
  124. // finish marks this span as finished in the trace.
  125. func (c *spanContext) finish() { c.trace.finishedOne(c.span) }
  126. // samplingDecision is the decision to send a trace to the agent or not.
  127. type samplingDecision uint32
  128. const (
  129. // decisionNone is the default state of a trace.
  130. // If no decision is made about the trace, the trace won't be sent to the agent.
  131. decisionNone samplingDecision = iota
  132. // decisionDrop prevents the trace from being sent to the agent.
  133. decisionDrop
  134. // decisionKeep ensures the trace will be sent to the agent.
  135. decisionKeep
  136. )
  137. // trace contains shared context information about a trace, such as sampling
  138. // priority, the root reference and a buffer of the spans which are part of the
  139. // trace, if these exist.
  140. type trace struct {
  141. mu sync.RWMutex // guards below fields
  142. spans []*span // all the spans that are part of this trace
  143. tags map[string]string // trace level tags
  144. propagatingTags map[string]string // trace level tags that will be propagated across service boundaries
  145. finished int // the number of finished spans
  146. full bool // signifies that the span buffer is full
  147. priority *float64 // sampling priority
  148. locked bool // specifies if the sampling priority can be altered
  149. samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.
  150. // root specifies the root of the trace, if known; it is nil when a span
  151. // context is extracted from a carrier, at which point there are no spans in
  152. // the trace yet.
  153. root *span
  154. }
  155. var (
  156. // traceStartSize is the initial size of our trace buffer,
  157. // by default we allocate for a handful of spans within the trace,
  158. // reasonable as span is actually way bigger, and avoids re-allocating
  159. // over and over. Could be fine-tuned at runtime.
  160. traceStartSize = 10
  161. // traceMaxSize is the maximum number of spans we keep in memory for a
  162. // single trace. This is to avoid memory leaks. If more spans than this
  163. // are added to a trace, then the trace is dropped and the spans are
  164. // discarded. Adding additional spans after a trace is dropped does
  165. // nothing.
  166. traceMaxSize = int(1e5)
  167. )
  168. // newTrace creates a new trace using the given callback which will be called
  169. // upon completion of the trace.
  170. func newTrace() *trace {
  171. return &trace{spans: make([]*span, 0, traceStartSize)}
  172. }
  173. func (t *trace) samplingPriorityLocked() (p int, ok bool) {
  174. if t.priority == nil {
  175. return 0, false
  176. }
  177. return int(*t.priority), true
  178. }
  179. func (t *trace) samplingPriority() (p int, ok bool) {
  180. t.mu.RLock()
  181. defer t.mu.RUnlock()
  182. return t.samplingPriorityLocked()
  183. }
  184. func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName) {
  185. t.mu.Lock()
  186. defer t.mu.Unlock()
  187. t.setSamplingPriorityLocked(p, sampler)
  188. }
  189. func (t *trace) keep() {
  190. atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep))
  191. }
  192. func (t *trace) drop() {
  193. atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionDrop))
  194. }
  195. func (t *trace) setTag(key, value string) {
  196. if t.tags == nil {
  197. t.tags = make(map[string]string, 1)
  198. }
  199. t.tags[key] = value
  200. }
  201. // setPropagatingTag sets the key/value pair as a trace propagating tag.
  202. func (t *trace) setPropagatingTag(key, value string) {
  203. t.mu.Lock()
  204. defer t.mu.Unlock()
  205. t.setPropagatingTagLocked(key, value)
  206. }
  207. // setPropagatingTagLocked sets the key/value pair as a trace propagating tag.
  208. // Not safe for concurrent use, setPropagatingTag should be used instead in that case.
  209. func (t *trace) setPropagatingTagLocked(key, value string) {
  210. if t.propagatingTags == nil {
  211. t.propagatingTags = make(map[string]string, 1)
  212. }
  213. t.propagatingTags[key] = value
  214. }
  215. // unsetPropagatingTag deletes the key/value pair from the trace's propagated tags.
  216. func (t *trace) unsetPropagatingTag(key string) {
  217. t.mu.Lock()
  218. defer t.mu.Unlock()
  219. delete(t.propagatingTags, key)
  220. }
  221. func (t *trace) setSamplingPriorityLocked(p int, sampler samplernames.SamplerName) {
  222. if t.locked {
  223. return
  224. }
  225. if t.priority == nil {
  226. t.priority = new(float64)
  227. }
  228. *t.priority = float64(p)
  229. _, ok := t.propagatingTags[keyDecisionMaker]
  230. if p > 0 && !ok && sampler != samplernames.Unknown {
  231. // We have a positive priority and the sampling mechanism isn't set.
  232. // Send nothing when sampler is `Unknown` for RFC compliance.
  233. t.setPropagatingTagLocked(keyDecisionMaker, "-"+strconv.Itoa(int(sampler)))
  234. }
  235. if p <= 0 && ok {
  236. delete(t.propagatingTags, keyDecisionMaker)
  237. }
  238. }
  239. // push pushes a new span into the trace. If the buffer is full, it returns
  240. // a errBufferFull error.
  241. func (t *trace) push(sp *span) {
  242. t.mu.Lock()
  243. defer t.mu.Unlock()
  244. if t.full {
  245. return
  246. }
  247. tr, haveTracer := internal.GetGlobalTracer().(*tracer)
  248. if len(t.spans) >= traceMaxSize {
  249. // capacity is reached, we will not be able to complete this trace.
  250. t.full = true
  251. t.spans = nil // GC
  252. log.Error("trace buffer full (%d), dropping trace", traceMaxSize)
  253. if haveTracer {
  254. atomic.AddUint32(&tr.tracesDropped, 1)
  255. }
  256. return
  257. }
  258. if v, ok := sp.Metrics[keySamplingPriority]; ok {
  259. t.setSamplingPriorityLocked(int(v), samplernames.Unknown)
  260. }
  261. t.spans = append(t.spans, sp)
  262. if haveTracer {
  263. atomic.AddUint32(&tr.spansStarted, 1)
  264. }
  265. }
  266. // finishedOne acknowledges that another span in the trace has finished, and checks
  267. // if the trace is complete, in which case it calls the onFinish function. It uses
  268. // the given priority, if non-nil, to mark the root span.
  269. func (t *trace) finishedOne(s *span) {
  270. t.mu.Lock()
  271. defer t.mu.Unlock()
  272. if t.full {
  273. // capacity has been reached, the buffer is no longer tracking
  274. // all the spans in the trace, so the below conditions will not
  275. // be accurate and would trigger a pre-mature flush, exposing us
  276. // to a race condition where spans can be modified while flushing.
  277. return
  278. }
  279. t.finished++
  280. if s == t.root && t.priority != nil {
  281. // after the root has finished we lock down the priority;
  282. // we won't be able to make changes to a span after finishing
  283. // without causing a race condition.
  284. t.root.setMetric(keySamplingPriority, *t.priority)
  285. t.locked = true
  286. }
  287. if len(t.spans) > 0 && s == t.spans[0] {
  288. // first span in chunk finished, lock down the tags
  289. //
  290. // TODO(barbayar): make sure this doesn't happen in vain when switching to
  291. // the new wire format. We won't need to set the tags on the first span
  292. // in the chunk there.
  293. for k, v := range t.tags {
  294. s.setMeta(k, v)
  295. }
  296. for k, v := range t.propagatingTags {
  297. s.setMeta(k, v)
  298. }
  299. }
  300. if len(t.spans) != t.finished {
  301. return
  302. }
  303. defer func() {
  304. t.spans = nil
  305. t.finished = 0 // important, because a buffer can be used for several flushes
  306. }()
  307. tr, ok := internal.GetGlobalTracer().(*tracer)
  308. if !ok {
  309. return
  310. }
  311. // we have a tracer that can receive completed traces.
  312. atomic.AddUint32(&tr.spansFinished, uint32(len(t.spans)))
  313. tr.pushTrace(&finishedTrace{
  314. spans: t.spans,
  315. willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
  316. })
  317. }