telemetry.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package statsd
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. /*
  8. telemetryInterval is the interval at which telemetry will be sent by the client.
  9. */
  10. const telemetryInterval = 10 * time.Second
  11. /*
  12. clientTelemetryTag is a tag identifying this specific client.
  13. */
  14. var clientTelemetryTag = "client:go"
  15. /*
  16. clientVersionTelemetryTag is a tag identifying this specific client version.
  17. */
  18. var clientVersionTelemetryTag = "client_version:5.0.2"
  19. // Telemetry represents internal metrics about the client behavior since it started.
  20. type Telemetry struct {
  21. //
  22. // Those are produced by the 'Client'
  23. //
  24. // TotalMetrics is the total number of metrics sent by the client before aggregation and sampling.
  25. TotalMetrics uint64
  26. // TotalMetricsGauge is the total number of gauges sent by the client before aggregation and sampling.
  27. TotalMetricsGauge uint64
  28. // TotalMetricsCount is the total number of counts sent by the client before aggregation and sampling.
  29. TotalMetricsCount uint64
  30. // TotalMetricsHistogram is the total number of histograms sent by the client before aggregation and sampling.
  31. TotalMetricsHistogram uint64
  32. // TotalMetricsDistribution is the total number of distributions sent by the client before aggregation and
  33. // sampling.
  34. TotalMetricsDistribution uint64
  35. // TotalMetricsSet is the total number of sets sent by the client before aggregation and sampling.
  36. TotalMetricsSet uint64
  37. // TotalMetricsTiming is the total number of timings sent by the client before aggregation and sampling.
  38. TotalMetricsTiming uint64
  39. // TotalEvents is the total number of events sent by the client before aggregation and sampling.
  40. TotalEvents uint64
  41. // TotalServiceChecks is the total number of service_checks sent by the client before aggregation and sampling.
  42. TotalServiceChecks uint64
  43. // TotalDroppedOnReceive is the total number metrics/event/service_checks dropped when using ChannelMode (see
  44. // WithChannelMode option).
  45. TotalDroppedOnReceive uint64
  46. //
  47. // Those are produced by the 'sender'
  48. //
  49. // TotalPayloadsSent is the total number of payload (packet on the network) succesfully sent by the client. When
  50. // using UDP we don't know if packet dropped or not, so all packet are considered as succesfully sent.
  51. TotalPayloadsSent uint64
  52. // TotalPayloadsDropped is the total number of payload dropped by the client. This includes all cause of dropped
  53. // (TotalPayloadsDroppedQueueFull and TotalPayloadsDroppedWriter). When using UDP This won't includes the
  54. // network dropped.
  55. TotalPayloadsDropped uint64
  56. // TotalPayloadsDroppedWriter is the total number of payload dropped by the writer (when using UDS or named
  57. // pipe) due to network timeout or error.
  58. TotalPayloadsDroppedWriter uint64
  59. // TotalPayloadsDroppedQueueFull is the total number of payload dropped internally because the queue of payloads
  60. // waiting to be sent on the wire is full. This means the client is generating more metrics than can be sent on
  61. // the wire. If your app sends metrics in batch look at WithSenderQueueSize option to increase the queue size.
  62. TotalPayloadsDroppedQueueFull uint64
  63. // TotalBytesSent is the total number of bytes succesfully sent by the client. When using UDP we don't know if
  64. // packet dropped or not, so all packet are considered as succesfully sent.
  65. TotalBytesSent uint64
  66. // TotalBytesDropped is the total number of bytes dropped by the client. This includes all cause of dropped
  67. // (TotalBytesDroppedQueueFull and TotalBytesDroppedWriter). When using UDP This
  68. // won't includes the network dropped.
  69. TotalBytesDropped uint64
  70. // TotalBytesDroppedWriter is the total number of bytes dropped by the writer (when using UDS or named pipe) due
  71. // to network timeout or error.
  72. TotalBytesDroppedWriter uint64
  73. // TotalBytesDroppedQueueFull is the total number of bytes dropped internally because the queue of payloads
  74. // waiting to be sent on the wire is full. This means the client is generating more metrics than can be sent on
  75. // the wire. If your app sends metrics in batch look at WithSenderQueueSize option to increase the queue size.
  76. TotalBytesDroppedQueueFull uint64
  77. //
  78. // Those are produced by the 'aggregator'
  79. //
  80. // AggregationNbContext is the total number of contexts flushed by the aggregator when either
  81. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  82. AggregationNbContext uint64
  83. // AggregationNbContextGauge is the total number of contexts for gauges flushed by the aggregator when either
  84. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  85. AggregationNbContextGauge uint64
  86. // AggregationNbContextCount is the total number of contexts for counts flushed by the aggregator when either
  87. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  88. AggregationNbContextCount uint64
  89. // AggregationNbContextSet is the total number of contexts for sets flushed by the aggregator when either
  90. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  91. AggregationNbContextSet uint64
  92. // AggregationNbContextHistogram is the total number of contexts for histograms flushed by the aggregator when either
  93. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  94. AggregationNbContextHistogram uint64
  95. // AggregationNbContextDistribution is the total number of contexts for distributions flushed by the aggregator when either
  96. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  97. AggregationNbContextDistribution uint64
  98. // AggregationNbContextTiming is the total number of contexts for timings flushed by the aggregator when either
  99. // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
  100. AggregationNbContextTiming uint64
  101. }
  102. type telemetryClient struct {
  103. c *Client
  104. tags []string
  105. aggEnabled bool // is aggregation enabled and should we sent aggregation telemetry.
  106. tagsByType map[metricType][]string
  107. sender *sender
  108. worker *worker
  109. lastSample Telemetry // The previous sample of telemetry sent
  110. }
  111. func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *telemetryClient {
  112. t := &telemetryClient{
  113. c: c,
  114. tags: append(c.tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
  115. aggEnabled: aggregationEnabled,
  116. tagsByType: map[metricType][]string{},
  117. }
  118. t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
  119. t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
  120. t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
  121. t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
  122. t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
  123. t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
  124. return t
  125. }
  126. func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
  127. telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout)
  128. if err != nil {
  129. return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
  130. }
  131. t := newTelemetryClient(c, transport, aggregationEnabled)
  132. // Creating a custom sender/worker with 1 worker in mutex mode for the
  133. // telemetry that share the same bufferPool.
  134. // FIXME due to performance pitfall, we're always using UDP defaults
  135. // even for UDS.
  136. t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool)
  137. t.worker = newWorker(pool, t.sender)
  138. return t, nil
  139. }
  140. func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) {
  141. wg.Add(1)
  142. go func() {
  143. defer wg.Done()
  144. ticker := time.NewTicker(telemetryInterval)
  145. for {
  146. select {
  147. case <-ticker.C:
  148. t.sendTelemetry()
  149. case <-stop:
  150. ticker.Stop()
  151. if t.sender != nil {
  152. t.sender.close()
  153. }
  154. return
  155. }
  156. }
  157. }()
  158. }
  159. func (t *telemetryClient) sendTelemetry() {
  160. for _, m := range t.flush() {
  161. if t.worker != nil {
  162. t.worker.processMetric(m)
  163. } else {
  164. t.c.send(m)
  165. }
  166. }
  167. if t.worker != nil {
  168. t.worker.flush()
  169. }
  170. }
  171. func (t *telemetryClient) getTelemetry() Telemetry {
  172. if t == nil {
  173. // telemetry was disabled through the WithoutTelemetry option
  174. return Telemetry{}
  175. }
  176. tlm := Telemetry{}
  177. t.c.flushTelemetryMetrics(&tlm)
  178. t.c.sender.flushTelemetryMetrics(&tlm)
  179. t.c.agg.flushTelemetryMetrics(&tlm)
  180. tlm.TotalMetrics = tlm.TotalMetricsGauge +
  181. tlm.TotalMetricsCount +
  182. tlm.TotalMetricsSet +
  183. tlm.TotalMetricsHistogram +
  184. tlm.TotalMetricsDistribution +
  185. tlm.TotalMetricsTiming
  186. tlm.TotalPayloadsDropped = tlm.TotalPayloadsDroppedQueueFull + tlm.TotalPayloadsDroppedWriter
  187. tlm.TotalBytesDropped = tlm.TotalBytesDroppedQueueFull + tlm.TotalBytesDroppedWriter
  188. if t.aggEnabled {
  189. tlm.AggregationNbContext = tlm.AggregationNbContextGauge +
  190. tlm.AggregationNbContextCount +
  191. tlm.AggregationNbContextSet +
  192. tlm.AggregationNbContextHistogram +
  193. tlm.AggregationNbContextDistribution +
  194. tlm.AggregationNbContextTiming
  195. }
  196. return tlm
  197. }
  198. // flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
  199. func (t *telemetryClient) flush() []metric {
  200. m := []metric{}
  201. // same as Count but without global namespace
  202. telemetryCount := func(name string, value int64, tags []string) {
  203. m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1})
  204. }
  205. tlm := t.getTelemetry()
  206. // We send the diff between now and the previous telemetry flush. This keep the same telemetry behavior from V4
  207. // so users dashboard's aren't broken when upgrading to V5. It also allow to graph on the same dashboard a mix
  208. // of V4 and V5 apps.
  209. telemetryCount("datadog.dogstatsd.client.metrics", int64(tlm.TotalMetrics-t.lastSample.TotalMetrics), t.tags)
  210. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsGauge-t.lastSample.TotalMetricsGauge), t.tagsByType[gauge])
  211. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsCount-t.lastSample.TotalMetricsCount), t.tagsByType[count])
  212. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsHistogram-t.lastSample.TotalMetricsHistogram), t.tagsByType[histogram])
  213. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsDistribution-t.lastSample.TotalMetricsDistribution), t.tagsByType[distribution])
  214. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsSet-t.lastSample.TotalMetricsSet), t.tagsByType[set])
  215. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsTiming-t.lastSample.TotalMetricsTiming), t.tagsByType[timing])
  216. telemetryCount("datadog.dogstatsd.client.events", int64(tlm.TotalEvents-t.lastSample.TotalEvents), t.tags)
  217. telemetryCount("datadog.dogstatsd.client.service_checks", int64(tlm.TotalServiceChecks-t.lastSample.TotalServiceChecks), t.tags)
  218. telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(tlm.TotalDroppedOnReceive-t.lastSample.TotalDroppedOnReceive), t.tags)
  219. telemetryCount("datadog.dogstatsd.client.packets_sent", int64(tlm.TotalPayloadsSent-t.lastSample.TotalPayloadsSent), t.tags)
  220. telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(tlm.TotalPayloadsDropped-t.lastSample.TotalPayloadsDropped), t.tags)
  221. telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(tlm.TotalPayloadsDroppedQueueFull-t.lastSample.TotalPayloadsDroppedQueueFull), t.tags)
  222. telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(tlm.TotalPayloadsDroppedWriter-t.lastSample.TotalPayloadsDroppedWriter), t.tags)
  223. telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(tlm.TotalBytesDropped-t.lastSample.TotalBytesDropped), t.tags)
  224. telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(tlm.TotalBytesSent-t.lastSample.TotalBytesSent), t.tags)
  225. telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(tlm.TotalBytesDroppedQueueFull-t.lastSample.TotalBytesDroppedQueueFull), t.tags)
  226. telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(tlm.TotalBytesDroppedWriter-t.lastSample.TotalBytesDroppedWriter), t.tags)
  227. if t.aggEnabled {
  228. telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(tlm.AggregationNbContext-t.lastSample.AggregationNbContext), t.tags)
  229. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextGauge-t.lastSample.AggregationNbContextGauge), t.tagsByType[gauge])
  230. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextSet-t.lastSample.AggregationNbContextSet), t.tagsByType[set])
  231. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextCount-t.lastSample.AggregationNbContextCount), t.tagsByType[count])
  232. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextHistogram-t.lastSample.AggregationNbContextHistogram), t.tagsByType[histogram])
  233. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextDistribution-t.lastSample.AggregationNbContextDistribution), t.tagsByType[distribution])
  234. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextTiming-t.lastSample.AggregationNbContextTiming), t.tagsByType[timing])
  235. }
  236. t.lastSample = tlm
  237. return m
  238. }