http.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright 2022 The OpenZipkin Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Package http implements a HTTP reporter to send spans to Zipkin V2 collectors.
  16. */
  17. package http
  18. import (
  19. "bytes"
  20. "context"
  21. "log"
  22. "net/http"
  23. "os"
  24. "sync"
  25. "time"
  26. "github.com/openzipkin/zipkin-go/model"
  27. "github.com/openzipkin/zipkin-go/reporter"
  28. )
  29. // defaults
  30. const (
  31. defaultTimeout = 5 * time.Second // timeout for http request in seconds
  32. defaultBatchInterval = 1 * time.Second // BatchInterval in seconds
  33. defaultBatchSize = 100
  34. defaultMaxBacklog = 1000
  35. )
  36. // HTTPDoer will do a request to the Zipkin HTTP Collector
  37. type HTTPDoer interface { // nolint: revive // keep as is, we don't want to break dependendants
  38. Do(req *http.Request) (*http.Response, error)
  39. }
  40. // httpReporter will send spans to a Zipkin HTTP Collector using Zipkin V2 API.
  41. type httpReporter struct {
  42. url string
  43. client HTTPDoer
  44. logger *log.Logger
  45. batchInterval time.Duration
  46. batchSize int
  47. maxBacklog int
  48. batchMtx *sync.Mutex
  49. batch []*model.SpanModel
  50. spanC chan *model.SpanModel
  51. sendC chan struct{}
  52. quit chan struct{}
  53. shutdown chan error
  54. reqCallback RequestCallbackFn
  55. reqTimeout time.Duration
  56. serializer reporter.SpanSerializer
  57. }
  58. // Send implements reporter
  59. func (r *httpReporter) Send(s model.SpanModel) {
  60. r.spanC <- &s
  61. }
  62. // Close implements reporter
  63. func (r *httpReporter) Close() error {
  64. close(r.quit)
  65. return <-r.shutdown
  66. }
  67. func (r *httpReporter) loop() {
  68. var (
  69. nextSend = time.Now().Add(r.batchInterval)
  70. ticker = time.NewTicker(r.batchInterval / 10)
  71. tickerChan = ticker.C
  72. )
  73. defer ticker.Stop()
  74. for {
  75. select {
  76. case span := <-r.spanC:
  77. currentBatchSize := r.append(span)
  78. if currentBatchSize >= r.batchSize {
  79. nextSend = time.Now().Add(r.batchInterval)
  80. r.enqueueSend()
  81. }
  82. case <-tickerChan:
  83. if time.Now().After(nextSend) {
  84. nextSend = time.Now().Add(r.batchInterval)
  85. r.enqueueSend()
  86. }
  87. case <-r.quit:
  88. close(r.sendC)
  89. return
  90. }
  91. }
  92. }
  93. func (r *httpReporter) sendLoop() {
  94. for range r.sendC {
  95. _ = r.sendBatch()
  96. }
  97. r.shutdown <- r.sendBatch()
  98. }
  99. func (r *httpReporter) enqueueSend() {
  100. select {
  101. case r.sendC <- struct{}{}:
  102. default:
  103. // Do nothing if there's a pending send request already
  104. }
  105. }
  106. func (r *httpReporter) append(span *model.SpanModel) (newBatchSize int) {
  107. r.batchMtx.Lock()
  108. r.batch = append(r.batch, span)
  109. if len(r.batch) > r.maxBacklog {
  110. dispose := len(r.batch) - r.maxBacklog
  111. r.logger.Printf("backlog too long, disposing %d spans", dispose)
  112. r.batch = r.batch[dispose:]
  113. }
  114. newBatchSize = len(r.batch)
  115. r.batchMtx.Unlock()
  116. return
  117. }
  118. func (r *httpReporter) sendBatch() error {
  119. // Select all current spans in the batch to be sent
  120. r.batchMtx.Lock()
  121. sendBatch := r.batch[:]
  122. r.batchMtx.Unlock()
  123. if len(sendBatch) == 0 {
  124. return nil
  125. }
  126. body, err := r.serializer.Serialize(sendBatch)
  127. if err != nil {
  128. r.logger.Printf("failed when marshalling the spans batch: %s\n", err.Error())
  129. return err
  130. }
  131. req, err := http.NewRequest("POST", r.url, bytes.NewReader(body))
  132. if err != nil {
  133. r.logger.Printf("failed when creating the request: %s\n", err.Error())
  134. return err
  135. }
  136. // By default we send b3:0 header to mitigate trace reporting amplification in
  137. // service mesh environments where the sidecar proxies might trace the call
  138. // we do here towards the Zipkin collector.
  139. req.Header.Set("b3", "0")
  140. req.Header.Set("Content-Type", r.serializer.ContentType())
  141. if r.reqCallback != nil {
  142. r.reqCallback(req)
  143. }
  144. ctx, cancel := context.WithTimeout(req.Context(), r.reqTimeout)
  145. defer cancel()
  146. resp, err := r.client.Do(req.WithContext(ctx))
  147. if err != nil {
  148. r.logger.Printf("failed to send the request: %s\n", err.Error())
  149. return err
  150. }
  151. _ = resp.Body.Close()
  152. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  153. r.logger.Printf("failed the request with status code %d\n", resp.StatusCode)
  154. }
  155. // Remove sent spans from the batch even if they were not saved
  156. r.batchMtx.Lock()
  157. r.batch = r.batch[len(sendBatch):]
  158. r.batchMtx.Unlock()
  159. return nil
  160. }
  161. // RequestCallbackFn receives the initialized request from the Collector before
  162. // sending it over the wire. This allows one to plug in additional headers or
  163. // do other customization.
  164. type RequestCallbackFn func(*http.Request)
  165. // ReporterOption sets a parameter for the HTTP Reporter
  166. type ReporterOption func(r *httpReporter)
  167. // Timeout sets maximum timeout for the http request through its context.
  168. func Timeout(duration time.Duration) ReporterOption {
  169. return func(r *httpReporter) { r.reqTimeout = duration }
  170. }
  171. // BatchSize sets the maximum batch size, after which a collect will be
  172. // triggered. The default batch size is 100 traces.
  173. func BatchSize(n int) ReporterOption {
  174. return func(r *httpReporter) { r.batchSize = n }
  175. }
  176. // MaxBacklog sets the maximum backlog size. When batch size reaches this
  177. // threshold, spans from the beginning of the batch will be disposed.
  178. func MaxBacklog(n int) ReporterOption {
  179. return func(r *httpReporter) { r.maxBacklog = n }
  180. }
  181. // BatchInterval sets the maximum duration we will buffer traces before
  182. // emitting them to the collector. The default batch interval is 1 second.
  183. func BatchInterval(d time.Duration) ReporterOption {
  184. return func(r *httpReporter) { r.batchInterval = d }
  185. }
  186. // Client sets a custom http client to use under the interface HTTPDoer
  187. // which includes a `Do` method with same signature as the *http.Client
  188. func Client(client HTTPDoer) ReporterOption {
  189. return func(r *httpReporter) { r.client = client }
  190. }
  191. // RequestCallback registers a callback function to adjust the reporter
  192. // *http.Request before it sends the request to Zipkin.
  193. func RequestCallback(rc RequestCallbackFn) ReporterOption {
  194. return func(r *httpReporter) { r.reqCallback = rc }
  195. }
  196. // Logger sets the logger used to report errors in the collection
  197. // process.
  198. func Logger(l *log.Logger) ReporterOption {
  199. return func(r *httpReporter) { r.logger = l }
  200. }
  201. // Serializer sets the serialization function to use for sending span data to
  202. // Zipkin.
  203. func Serializer(serializer reporter.SpanSerializer) ReporterOption {
  204. return func(r *httpReporter) {
  205. if serializer != nil {
  206. r.serializer = serializer
  207. }
  208. }
  209. }
  210. // NewReporter returns a new HTTP Reporter.
  211. // url should be the endpoint to send the spans to, e.g.
  212. // http://localhost:9411/api/v2/spans
  213. func NewReporter(url string, opts ...ReporterOption) reporter.Reporter {
  214. r := httpReporter{
  215. url: url,
  216. logger: log.New(os.Stderr, "", log.LstdFlags),
  217. client: &http.Client{},
  218. batchInterval: defaultBatchInterval,
  219. batchSize: defaultBatchSize,
  220. maxBacklog: defaultMaxBacklog,
  221. batch: []*model.SpanModel{},
  222. spanC: make(chan *model.SpanModel),
  223. sendC: make(chan struct{}, 1),
  224. quit: make(chan struct{}, 1),
  225. shutdown: make(chan error, 1),
  226. batchMtx: &sync.Mutex{},
  227. serializer: reporter.JSONSerializer{},
  228. reqTimeout: defaultTimeout,
  229. }
  230. for _, opt := range opts {
  231. opt(&r)
  232. }
  233. go r.loop()
  234. go r.sendLoop()
  235. return &r
  236. }