writer.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. package tracer
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "io"
  11. "math"
  12. "os"
  13. "strconv"
  14. "sync"
  15. "time"
  16. "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
  17. )
  18. type traceWriter interface {
  19. // add adds traces to be sent by the writer.
  20. add([]*span)
  21. // flush causes the writer to send any buffered traces.
  22. flush()
  23. // stop gracefully shuts down the writer.
  24. stop()
  25. }
  26. type agentTraceWriter struct {
  27. // config holds the tracer configuration
  28. config *config
  29. // payload encodes and buffers traces in msgpack format
  30. payload *payload
  31. // climit limits the number of concurrent outgoing connections
  32. climit chan struct{}
  33. // wg waits for all uploads to finish
  34. wg sync.WaitGroup
  35. // prioritySampling is the prioritySampler into which agentTraceWriter will
  36. // read sampling rates sent by the agent
  37. prioritySampling *prioritySampler
  38. // statsd is used to send metrics
  39. statsd statsdClient
  40. }
  41. func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient statsdClient) *agentTraceWriter {
  42. return &agentTraceWriter{
  43. config: c,
  44. payload: newPayload(),
  45. climit: make(chan struct{}, concurrentConnectionLimit),
  46. prioritySampling: s,
  47. statsd: statsdClient,
  48. }
  49. }
  50. func (h *agentTraceWriter) add(trace []*span) {
  51. if err := h.payload.push(trace); err != nil {
  52. h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1)
  53. log.Error("Error encoding msgpack: %v", err)
  54. }
  55. if h.payload.size() > payloadSizeLimit {
  56. h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1)
  57. h.flush()
  58. }
  59. }
  60. func (h *agentTraceWriter) stop() {
  61. h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1)
  62. h.flush()
  63. h.wg.Wait()
  64. }
  65. // flush will push any currently buffered traces to the server.
  66. func (h *agentTraceWriter) flush() {
  67. if h.payload.itemCount() == 0 {
  68. return
  69. }
  70. h.wg.Add(1)
  71. h.climit <- struct{}{}
  72. oldp := h.payload
  73. h.payload = newPayload()
  74. go func(p *payload) {
  75. defer func(start time.Time) {
  76. <-h.climit
  77. h.wg.Done()
  78. h.statsd.Timing("datadog.tracer.flush_duration", time.Since(start), nil, 1)
  79. }(time.Now())
  80. size, count := p.size(), p.itemCount()
  81. log.Debug("Sending payload: size: %d traces: %d\n", size, count)
  82. rc, err := h.config.transport.send(p)
  83. if err != nil {
  84. h.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1)
  85. log.Error("lost %d traces: %v", count, err)
  86. } else {
  87. h.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1)
  88. h.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1)
  89. if err := h.prioritySampling.readRatesJSON(rc); err != nil {
  90. h.statsd.Incr("datadog.tracer.decode_error", nil, 1)
  91. }
  92. }
  93. }(oldp)
  94. }
  95. // logWriter specifies the output target of the logTraceWriter; replaced in tests.
  96. var logWriter io.Writer = os.Stdout
  97. // logTraceWriter encodes traces into a format understood by the Datadog Forwarder
  98. // (https://github.com/DataDog/datadog-serverless-functions/tree/master/aws/logs_monitoring)
  99. // and writes them to os.Stdout. This is used to send traces from an AWS Lambda environment.
  100. type logTraceWriter struct {
  101. config *config
  102. buf bytes.Buffer
  103. hasTraces bool
  104. w io.Writer
  105. statsd statsdClient
  106. }
  107. func newLogTraceWriter(c *config, statsdClient statsdClient) *logTraceWriter {
  108. w := &logTraceWriter{
  109. config: c,
  110. w: logWriter,
  111. statsd: statsdClient,
  112. }
  113. w.resetBuffer()
  114. return w
  115. }
  116. const (
  117. // maxFloatLength is the maximum length that a string encoded by encodeFloat will be.
  118. maxFloatLength = 24
  119. // logBufferSuffix is the final string that the trace writer has to append to a buffer to close
  120. // the JSON.
  121. logBufferSuffix = "]}\n"
  122. // logBufferLimit is the maximum size log line allowed by cloudwatch
  123. logBufferLimit = 256 * 1024
  124. )
  125. func (h *logTraceWriter) resetBuffer() {
  126. h.buf.Reset()
  127. h.buf.WriteString(`{"traces": [`)
  128. h.hasTraces = false
  129. }
  130. // encodeFloat correctly encodes float64 into the JSON format followed by ES6.
  131. // This code is reworked from Go's encoding/json package
  132. // (https://github.com/golang/go/blob/go1.15/src/encoding/json/encode.go#L573)
  133. //
  134. // One important departure from encoding/json is that infinities and nans are encoded
  135. // as null rather than signalling an error.
  136. func encodeFloat(p []byte, f float64) []byte {
  137. if math.IsInf(f, 0) || math.IsNaN(f) {
  138. return append(p, "null"...)
  139. }
  140. abs := math.Abs(f)
  141. if abs != 0 && (abs < 1e-6 || abs >= 1e21) {
  142. p = strconv.AppendFloat(p, f, 'e', -1, 64)
  143. // clean up e-09 to e-9
  144. n := len(p)
  145. if n >= 4 && p[n-4] == 'e' && p[n-3] == '-' && p[n-2] == '0' {
  146. p[n-2] = p[n-1]
  147. p = p[:n-1]
  148. }
  149. } else {
  150. p = strconv.AppendFloat(p, f, 'f', -1, 64)
  151. }
  152. return p
  153. }
  154. func (h *logTraceWriter) encodeSpan(s *span) {
  155. var scratch [maxFloatLength]byte
  156. h.buf.WriteString(`{"trace_id":"`)
  157. h.buf.Write(strconv.AppendUint(scratch[:0], uint64(s.TraceID), 16))
  158. h.buf.WriteString(`","span_id":"`)
  159. h.buf.Write(strconv.AppendUint(scratch[:0], uint64(s.SpanID), 16))
  160. h.buf.WriteString(`","parent_id":"`)
  161. h.buf.Write(strconv.AppendUint(scratch[:0], uint64(s.ParentID), 16))
  162. h.buf.WriteString(`","name":`)
  163. h.marshalString(s.Name)
  164. h.buf.WriteString(`,"resource":`)
  165. h.marshalString(s.Resource)
  166. h.buf.WriteString(`,"error":`)
  167. h.buf.Write(strconv.AppendInt(scratch[:0], int64(s.Error), 10))
  168. h.buf.WriteString(`,"meta":{`)
  169. first := true
  170. for k, v := range s.Meta {
  171. if first {
  172. first = false
  173. } else {
  174. h.buf.WriteString(`,`)
  175. }
  176. h.marshalString(k)
  177. h.buf.WriteString(":")
  178. h.marshalString(v)
  179. }
  180. h.buf.WriteString(`},"metrics":{`)
  181. first = true
  182. for k, v := range s.Metrics {
  183. if math.IsNaN(v) || math.IsInf(v, 0) {
  184. // The trace forwarder does not support infinity or nan, so we do not send metrics with those values.
  185. continue
  186. }
  187. if first {
  188. first = false
  189. } else {
  190. h.buf.WriteString(`,`)
  191. }
  192. h.marshalString(k)
  193. h.buf.WriteString(`:`)
  194. h.buf.Write(encodeFloat(scratch[:0], v))
  195. }
  196. h.buf.WriteString(`},"start":`)
  197. h.buf.Write(strconv.AppendInt(scratch[:0], s.Start, 10))
  198. h.buf.WriteString(`,"duration":`)
  199. h.buf.Write(strconv.AppendInt(scratch[:0], s.Duration, 10))
  200. h.buf.WriteString(`,"service":`)
  201. h.marshalString(s.Service)
  202. h.buf.WriteString(`}`)
  203. }
  204. // marshalString marshals the string str as JSON into the writer's buffer.
  205. // Should be used whenever writing non-constant string data to ensure correct sanitization.
  206. func (h *logTraceWriter) marshalString(str string) {
  207. m, err := json.Marshal(str)
  208. if err != nil {
  209. log.Error("Error marshaling value %q: %v", str, err)
  210. } else {
  211. h.buf.Write(m)
  212. }
  213. }
  214. type encodingError struct {
  215. cause error
  216. dropReason string
  217. }
  218. // writeTrace makes an effort to write the trace into the current buffer. It returns
  219. // the number of spans (n) that it wrote and an error (err), if one occurred.
  220. // n may be less than len(trace), meaning that only the first n spans of the trace
  221. // fit into the current buffer. Once the buffer is flushed, the remaining spans
  222. // from the trace can be retried.
  223. // An error, if one is returned, indicates that a span in the trace is too large
  224. // to fit in one buffer, and the trace cannot be written.
  225. func (h *logTraceWriter) writeTrace(trace []*span) (n int, err *encodingError) {
  226. startn := h.buf.Len()
  227. if !h.hasTraces {
  228. h.buf.WriteByte('[')
  229. } else {
  230. h.buf.WriteString(", [")
  231. }
  232. written := 0
  233. for i, s := range trace {
  234. n := h.buf.Len()
  235. if i > 0 {
  236. h.buf.WriteByte(',')
  237. }
  238. h.encodeSpan(s)
  239. if h.buf.Len() > logBufferLimit-len(logBufferSuffix) {
  240. // This span is too big to fit in the current buffer.
  241. if i == 0 {
  242. // This was the first span in this trace. This means we should truncate
  243. // everything we wrote in writeTrace
  244. h.buf.Truncate(startn)
  245. if !h.hasTraces {
  246. // This is the first span of the first trace in the buffer and it's too big.
  247. // We will never be able to send this trace, so we will drop it.
  248. return 0, &encodingError{cause: errors.New("span too large for buffer"), dropReason: "trace_too_large"}
  249. }
  250. return 0, nil
  251. }
  252. // This span was too big, but it might fit in the next buffer.
  253. // We can finish this trace and try again with an empty buffer (see *logTaceWriter.add)
  254. h.buf.Truncate(n)
  255. break
  256. }
  257. written++
  258. }
  259. h.buf.WriteByte(']')
  260. h.hasTraces = true
  261. return written, nil
  262. }
  263. // add adds a trace to the writer's buffer.
  264. func (h *logTraceWriter) add(trace []*span) {
  265. // Try adding traces to the buffer until we flush them all or encounter an error.
  266. for len(trace) > 0 {
  267. n, err := h.writeTrace(trace)
  268. if err != nil {
  269. log.Error("Lost a trace: %s", err.cause)
  270. h.statsd.Count("datadog.tracer.traces_dropped", 1, []string{"reason:" + err.dropReason}, 1)
  271. return
  272. }
  273. trace = trace[n:]
  274. // If there are traces left that didn't fit into the buffer, flush the buffer and loop to
  275. // write the remaining spans.
  276. if len(trace) > 0 {
  277. h.flush()
  278. }
  279. }
  280. }
  281. func (h *logTraceWriter) stop() {
  282. h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1)
  283. h.flush()
  284. }
  285. // flush will write any buffered traces to standard output.
  286. func (h *logTraceWriter) flush() {
  287. if !h.hasTraces {
  288. return
  289. }
  290. h.buf.WriteString(logBufferSuffix)
  291. h.w.Write(h.buf.Bytes())
  292. h.resetBuffer()
  293. }