telemetry.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package statsd
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. /*
  8. TelemetryInterval is the interval at which telemetry will be sent by the client.
  9. */
  10. const TelemetryInterval = 10 * time.Second
  11. /*
  12. clientTelemetryTag is a tag identifying this specific client.
  13. */
  14. var clientTelemetryTag = "client:go"
  15. /*
  16. clientVersionTelemetryTag is a tag identifying this specific client version.
  17. */
  18. var clientVersionTelemetryTag = "client_version:4.8.2"
  19. type telemetryClient struct {
  20. c *Client
  21. tags []string
  22. tagsByType map[metricType][]string
  23. sender *sender
  24. worker *worker
  25. devMode bool
  26. }
  27. func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient {
  28. t := &telemetryClient{
  29. c: c,
  30. tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
  31. tagsByType: map[metricType][]string{},
  32. devMode: devMode,
  33. }
  34. if devMode {
  35. t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
  36. t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
  37. t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
  38. t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
  39. t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
  40. t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
  41. t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
  42. }
  43. return t
  44. }
  45. func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
  46. telemetryWriter, _, err := createWriter(telemetryAddr)
  47. if err != nil {
  48. return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
  49. }
  50. t := newTelemetryClient(c, transport, devMode)
  51. // Creating a custom sender/worker with 1 worker in mutex mode for the
  52. // telemetry that share the same bufferPool.
  53. // FIXME due to performance pitfall, we're always using UDP defaults
  54. // even for UDS.
  55. t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool)
  56. t.worker = newWorker(pool, t.sender)
  57. return t, nil
  58. }
  59. func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) {
  60. wg.Add(1)
  61. go func() {
  62. defer wg.Done()
  63. ticker := time.NewTicker(TelemetryInterval)
  64. for {
  65. select {
  66. case <-ticker.C:
  67. t.sendTelemetry()
  68. case <-stop:
  69. ticker.Stop()
  70. if t.sender != nil {
  71. t.sender.close()
  72. }
  73. return
  74. }
  75. }
  76. }()
  77. }
  78. func (t *telemetryClient) sendTelemetry() {
  79. for _, m := range t.flush() {
  80. if t.worker != nil {
  81. t.worker.processMetric(m)
  82. } else {
  83. t.c.send(m)
  84. }
  85. }
  86. if t.worker != nil {
  87. t.worker.flush()
  88. }
  89. }
  90. // flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
  91. func (t *telemetryClient) flush() []metric {
  92. m := []metric{}
  93. // same as Count but without global namespace
  94. telemetryCount := func(name string, value int64, tags []string) {
  95. m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1})
  96. }
  97. clientMetrics := t.c.FlushTelemetryMetrics()
  98. telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics), t.tags)
  99. if t.devMode {
  100. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsGauge), t.tagsByType[gauge])
  101. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsCount), t.tagsByType[count])
  102. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsHistogram), t.tagsByType[histogram])
  103. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsDistribution), t.tagsByType[distribution])
  104. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsSet), t.tagsByType[set])
  105. telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsTiming), t.tagsByType[timing])
  106. }
  107. telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents), t.tags)
  108. telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks), t.tags)
  109. telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive), t.tags)
  110. senderMetrics := t.c.sender.flushTelemetryMetrics()
  111. telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads), t.tags)
  112. telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes), t.tags)
  113. telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads), t.tags)
  114. telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes), t.tags)
  115. telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull), t.tags)
  116. telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull), t.tags)
  117. telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter), t.tags)
  118. telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter), t.tags)
  119. if aggMetrics := t.c.agg.flushTelemetryMetrics(); aggMetrics != nil {
  120. telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext), t.tags)
  121. if t.devMode {
  122. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextGauge), t.tagsByType[gauge])
  123. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextSet), t.tagsByType[set])
  124. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextCount), t.tagsByType[count])
  125. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextHistogram), t.tagsByType[histogram])
  126. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextDistribution), t.tagsByType[distribution])
  127. telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextTiming), t.tagsByType[timing])
  128. }
  129. }
  130. return m
  131. }