| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package statsd
- import (
- "fmt"
- "sync"
- "time"
- )
- /*
- TelemetryInterval is the interval at which telemetry will be sent by the client.
- */
- const TelemetryInterval = 10 * time.Second
- /*
- clientTelemetryTag is a tag identifying this specific client.
- */
- var clientTelemetryTag = "client:go"
- /*
- clientVersionTelemetryTag is a tag identifying this specific client version.
- */
- var clientVersionTelemetryTag = "client_version:4.8.2"
- type telemetryClient struct {
- c *Client
- tags []string
- tagsByType map[metricType][]string
- sender *sender
- worker *worker
- devMode bool
- }
- func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient {
- t := &telemetryClient{
- c: c,
- tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
- tagsByType: map[metricType][]string{},
- devMode: devMode,
- }
- if devMode {
- t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
- t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
- t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
- t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
- t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
- t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
- t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
- }
- return t
- }
- func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
- telemetryWriter, _, err := createWriter(telemetryAddr)
- if err != nil {
- return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
- }
- t := newTelemetryClient(c, transport, devMode)
- // Creating a custom sender/worker with 1 worker in mutex mode for the
- // telemetry that share the same bufferPool.
- // FIXME due to performance pitfall, we're always using UDP defaults
- // even for UDS.
- t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool)
- t.worker = newWorker(pool, t.sender)
- return t, nil
- }
- func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) {
- wg.Add(1)
- go func() {
- defer wg.Done()
- ticker := time.NewTicker(TelemetryInterval)
- for {
- select {
- case <-ticker.C:
- t.sendTelemetry()
- case <-stop:
- ticker.Stop()
- if t.sender != nil {
- t.sender.close()
- }
- return
- }
- }
- }()
- }
- func (t *telemetryClient) sendTelemetry() {
- for _, m := range t.flush() {
- if t.worker != nil {
- t.worker.processMetric(m)
- } else {
- t.c.send(m)
- }
- }
- if t.worker != nil {
- t.worker.flush()
- }
- }
- // flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
- func (t *telemetryClient) flush() []metric {
- m := []metric{}
- // same as Count but without global namespace
- telemetryCount := func(name string, value int64, tags []string) {
- m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1})
- }
- clientMetrics := t.c.FlushTelemetryMetrics()
- telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics), t.tags)
- if t.devMode {
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsGauge), t.tagsByType[gauge])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsCount), t.tagsByType[count])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsHistogram), t.tagsByType[histogram])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsDistribution), t.tagsByType[distribution])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsSet), t.tagsByType[set])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsTiming), t.tagsByType[timing])
- }
- telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents), t.tags)
- telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks), t.tags)
- telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive), t.tags)
- senderMetrics := t.c.sender.flushTelemetryMetrics()
- telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter), t.tags)
- if aggMetrics := t.c.agg.flushTelemetryMetrics(); aggMetrics != nil {
- telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext), t.tags)
- if t.devMode {
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextGauge), t.tagsByType[gauge])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextSet), t.tagsByType[set])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextCount), t.tagsByType[count])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextHistogram), t.tagsByType[histogram])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextDistribution), t.tagsByType[distribution])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextTiming), t.tagsByType[timing])
- }
- }
- return m
- }
|