transport.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. package tracer
  6. import (
  7. "bytes"
  8. "fmt"
  9. "io"
  10. "net"
  11. "net/http"
  12. "os"
  13. "runtime"
  14. "strconv"
  15. "strings"
  16. "sync/atomic"
  17. "time"
  18. traceinternal "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
  19. "gopkg.in/DataDog/dd-trace-go.v1/internal"
  20. "gopkg.in/DataDog/dd-trace-go.v1/internal/version"
  21. "github.com/tinylib/msgp/msgp"
  22. )
  23. const (
  24. // headerComputedTopLevel specifies that the client has marked top-level spans, when set.
  25. // Any non-empty value will mean 'yes'.
  26. headerComputedTopLevel = "Datadog-Client-Computed-Top-Level"
  27. )
  28. var defaultDialer = &net.Dialer{
  29. Timeout: 30 * time.Second,
  30. KeepAlive: 30 * time.Second,
  31. DualStack: true,
  32. }
  33. var defaultClient = &http.Client{
  34. // We copy the transport to avoid using the default one, as it might be
  35. // augmented with tracing and we don't want these calls to be recorded.
  36. // See https://golang.org/pkg/net/http/#DefaultTransport .
  37. Transport: &http.Transport{
  38. Proxy: http.ProxyFromEnvironment,
  39. DialContext: defaultDialer.DialContext,
  40. MaxIdleConns: 100,
  41. IdleConnTimeout: 90 * time.Second,
  42. TLSHandshakeTimeout: 10 * time.Second,
  43. ExpectContinueTimeout: 1 * time.Second,
  44. },
  45. Timeout: defaultHTTPTimeout,
  46. }
  47. const (
  48. defaultHostname = "localhost"
  49. defaultPort = "8126"
  50. defaultAddress = defaultHostname + ":" + defaultPort
  51. defaultURL = "http://" + defaultAddress
  52. defaultHTTPTimeout = 2 * time.Second // defines the current timeout before giving up with the send process
  53. traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
  54. )
  55. // transport is an interface for communicating data to the agent.
  56. type transport interface {
  57. // send sends the payload p to the agent using the transport set up.
  58. // It returns a non-nil response body when no error occurred.
  59. send(p *payload) (body io.ReadCloser, err error)
  60. // sendStats sends the given stats payload to the agent.
  61. sendStats(s *statsPayload) error
  62. // endpoint returns the URL to which the transport will send traces.
  63. endpoint() string
  64. }
  65. type httpTransport struct {
  66. traceURL string // the delivery URL for traces
  67. statsURL string // the delivery URL for stats
  68. client *http.Client // the HTTP client used in the POST
  69. headers map[string]string // the Transport headers
  70. }
  71. // newTransport returns a new Transport implementation that sends traces to a
  72. // trace agent at the given url, using a given *http.Client.
  73. //
  74. // In general, using this method is only necessary if you have a trace agent
  75. // running on a non-default port, if it's located on another machine, or when
  76. // otherwise needing to customize the transport layer, for instance when using
  77. // a unix domain socket.
  78. func newHTTPTransport(url string, client *http.Client) *httpTransport {
  79. // initialize the default EncoderPool with Encoder headers
  80. defaultHeaders := map[string]string{
  81. "Datadog-Meta-Lang": "go",
  82. "Datadog-Meta-Lang-Version": strings.TrimPrefix(runtime.Version(), "go"),
  83. "Datadog-Meta-Lang-Interpreter": runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS,
  84. "Datadog-Meta-Tracer-Version": version.Tag,
  85. "Content-Type": "application/msgpack",
  86. }
  87. if cid := internal.ContainerID(); cid != "" {
  88. defaultHeaders["Datadog-Container-ID"] = cid
  89. }
  90. return &httpTransport{
  91. traceURL: fmt.Sprintf("%s/v0.4/traces", url),
  92. statsURL: fmt.Sprintf("%s/v0.6/stats", url),
  93. client: client,
  94. headers: defaultHeaders,
  95. }
  96. }
  97. func (t *httpTransport) sendStats(p *statsPayload) error {
  98. var buf bytes.Buffer
  99. if err := msgp.Encode(&buf, p); err != nil {
  100. return err
  101. }
  102. req, err := http.NewRequest("POST", t.statsURL, &buf)
  103. if err != nil {
  104. return err
  105. }
  106. resp, err := t.client.Do(req)
  107. if err != nil {
  108. return err
  109. }
  110. if code := resp.StatusCode; code >= 400 {
  111. // error, check the body for context information and
  112. // return a nice error.
  113. msg := make([]byte, 1000)
  114. n, _ := resp.Body.Read(msg)
  115. resp.Body.Close()
  116. txt := http.StatusText(code)
  117. if n > 0 {
  118. return fmt.Errorf("%s (Status: %s)", msg[:n], txt)
  119. }
  120. return fmt.Errorf("%s", txt)
  121. }
  122. return nil
  123. }
  124. func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
  125. req, err := http.NewRequest("POST", t.traceURL, p)
  126. if err != nil {
  127. return nil, fmt.Errorf("cannot create http request: %v", err)
  128. }
  129. for header, value := range t.headers {
  130. req.Header.Set(header, value)
  131. }
  132. req.Header.Set(traceCountHeader, strconv.Itoa(p.itemCount()))
  133. req.Header.Set("Content-Length", strconv.Itoa(p.size()))
  134. req.Header.Set(headerComputedTopLevel, "yes")
  135. if t, ok := traceinternal.GetGlobalTracer().(*tracer); ok {
  136. if t.config.canComputeStats() {
  137. req.Header.Set("Datadog-Client-Computed-Stats", "yes")
  138. }
  139. droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0))
  140. partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0))
  141. droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0))
  142. if stats := t.statsd; stats != nil {
  143. stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces),
  144. []string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1)
  145. stats.Count("datadog.tracer.dropped_p0_spans", int64(droppedSpans), nil, 1)
  146. }
  147. req.Header.Set("Datadog-Client-Dropped-P0-Traces", strconv.Itoa(droppedTraces))
  148. req.Header.Set("Datadog-Client-Dropped-P0-Spans", strconv.Itoa(droppedSpans))
  149. }
  150. response, err := t.client.Do(req)
  151. if err != nil {
  152. return nil, err
  153. }
  154. if code := response.StatusCode; code >= 400 {
  155. // error, check the body for context information and
  156. // return a nice error.
  157. msg := make([]byte, 1000)
  158. n, _ := response.Body.Read(msg)
  159. response.Body.Close()
  160. txt := http.StatusText(code)
  161. if n > 0 {
  162. return nil, fmt.Errorf("%s (Status: %s)", msg[:n], txt)
  163. }
  164. return nil, fmt.Errorf("%s", txt)
  165. }
  166. return response.Body, nil
  167. }
  168. func (t *httpTransport) endpoint() string {
  169. return t.traceURL
  170. }
  171. // resolveAgentAddr resolves the given agent address and fills in any missing host
  172. // and port using the defaults. Some environment variable settings will
  173. // take precedence over configuration.
  174. func resolveAgentAddr() string {
  175. var host, port string
  176. if v := os.Getenv("DD_AGENT_HOST"); v != "" {
  177. host = v
  178. }
  179. if v := os.Getenv("DD_TRACE_AGENT_PORT"); v != "" {
  180. port = v
  181. }
  182. if host == "" {
  183. host = defaultHostname
  184. }
  185. if port == "" {
  186. port = defaultPort
  187. }
  188. return fmt.Sprintf("%s:%s", host, port)
  189. }