| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package statsd
- import (
- "fmt"
- "sync"
- "time"
- )
- /*
- telemetryInterval is the interval at which telemetry will be sent by the client.
- */
- const telemetryInterval = 10 * time.Second
- /*
- clientTelemetryTag is a tag identifying this specific client.
- */
- var clientTelemetryTag = "client:go"
- /*
- clientVersionTelemetryTag is a tag identifying this specific client version.
- */
- var clientVersionTelemetryTag = "client_version:5.0.2"
- // Telemetry represents internal metrics about the client behavior since it started.
- type Telemetry struct {
- //
- // Those are produced by the 'Client'
- //
- // TotalMetrics is the total number of metrics sent by the client before aggregation and sampling.
- TotalMetrics uint64
- // TotalMetricsGauge is the total number of gauges sent by the client before aggregation and sampling.
- TotalMetricsGauge uint64
- // TotalMetricsCount is the total number of counts sent by the client before aggregation and sampling.
- TotalMetricsCount uint64
- // TotalMetricsHistogram is the total number of histograms sent by the client before aggregation and sampling.
- TotalMetricsHistogram uint64
- // TotalMetricsDistribution is the total number of distributions sent by the client before aggregation and
- // sampling.
- TotalMetricsDistribution uint64
- // TotalMetricsSet is the total number of sets sent by the client before aggregation and sampling.
- TotalMetricsSet uint64
- // TotalMetricsTiming is the total number of timings sent by the client before aggregation and sampling.
- TotalMetricsTiming uint64
- // TotalEvents is the total number of events sent by the client before aggregation and sampling.
- TotalEvents uint64
- // TotalServiceChecks is the total number of service_checks sent by the client before aggregation and sampling.
- TotalServiceChecks uint64
- // TotalDroppedOnReceive is the total number metrics/event/service_checks dropped when using ChannelMode (see
- // WithChannelMode option).
- TotalDroppedOnReceive uint64
- //
- // Those are produced by the 'sender'
- //
- // TotalPayloadsSent is the total number of payload (packet on the network) succesfully sent by the client. When
- // using UDP we don't know if packet dropped or not, so all packet are considered as succesfully sent.
- TotalPayloadsSent uint64
- // TotalPayloadsDropped is the total number of payload dropped by the client. This includes all cause of dropped
- // (TotalPayloadsDroppedQueueFull and TotalPayloadsDroppedWriter). When using UDP This won't includes the
- // network dropped.
- TotalPayloadsDropped uint64
- // TotalPayloadsDroppedWriter is the total number of payload dropped by the writer (when using UDS or named
- // pipe) due to network timeout or error.
- TotalPayloadsDroppedWriter uint64
- // TotalPayloadsDroppedQueueFull is the total number of payload dropped internally because the queue of payloads
- // waiting to be sent on the wire is full. This means the client is generating more metrics than can be sent on
- // the wire. If your app sends metrics in batch look at WithSenderQueueSize option to increase the queue size.
- TotalPayloadsDroppedQueueFull uint64
- // TotalBytesSent is the total number of bytes succesfully sent by the client. When using UDP we don't know if
- // packet dropped or not, so all packet are considered as succesfully sent.
- TotalBytesSent uint64
- // TotalBytesDropped is the total number of bytes dropped by the client. This includes all cause of dropped
- // (TotalBytesDroppedQueueFull and TotalBytesDroppedWriter). When using UDP This
- // won't includes the network dropped.
- TotalBytesDropped uint64
- // TotalBytesDroppedWriter is the total number of bytes dropped by the writer (when using UDS or named pipe) due
- // to network timeout or error.
- TotalBytesDroppedWriter uint64
- // TotalBytesDroppedQueueFull is the total number of bytes dropped internally because the queue of payloads
- // waiting to be sent on the wire is full. This means the client is generating more metrics than can be sent on
- // the wire. If your app sends metrics in batch look at WithSenderQueueSize option to increase the queue size.
- TotalBytesDroppedQueueFull uint64
- //
- // Those are produced by the 'aggregator'
- //
- // AggregationNbContext is the total number of contexts flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContext uint64
- // AggregationNbContextGauge is the total number of contexts for gauges flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContextGauge uint64
- // AggregationNbContextCount is the total number of contexts for counts flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContextCount uint64
- // AggregationNbContextSet is the total number of contexts for sets flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContextSet uint64
- // AggregationNbContextHistogram is the total number of contexts for histograms flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContextHistogram uint64
- // AggregationNbContextDistribution is the total number of contexts for distributions flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContextDistribution uint64
- // AggregationNbContextTiming is the total number of contexts for timings flushed by the aggregator when either
- // WithClientSideAggregation or WithExtendedClientSideAggregation options are enabled.
- AggregationNbContextTiming uint64
- }
- type telemetryClient struct {
- c *Client
- tags []string
- aggEnabled bool // is aggregation enabled and should we sent aggregation telemetry.
- tagsByType map[metricType][]string
- sender *sender
- worker *worker
- lastSample Telemetry // The previous sample of telemetry sent
- }
- func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *telemetryClient {
- t := &telemetryClient{
- c: c,
- tags: append(c.tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
- aggEnabled: aggregationEnabled,
- tagsByType: map[metricType][]string{},
- }
- t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
- t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
- t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
- t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
- t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
- t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
- return t
- }
- func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
- telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout)
- if err != nil {
- return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
- }
- t := newTelemetryClient(c, transport, aggregationEnabled)
- // Creating a custom sender/worker with 1 worker in mutex mode for the
- // telemetry that share the same bufferPool.
- // FIXME due to performance pitfall, we're always using UDP defaults
- // even for UDS.
- t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool)
- t.worker = newWorker(pool, t.sender)
- return t, nil
- }
- func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) {
- wg.Add(1)
- go func() {
- defer wg.Done()
- ticker := time.NewTicker(telemetryInterval)
- for {
- select {
- case <-ticker.C:
- t.sendTelemetry()
- case <-stop:
- ticker.Stop()
- if t.sender != nil {
- t.sender.close()
- }
- return
- }
- }
- }()
- }
- func (t *telemetryClient) sendTelemetry() {
- for _, m := range t.flush() {
- if t.worker != nil {
- t.worker.processMetric(m)
- } else {
- t.c.send(m)
- }
- }
- if t.worker != nil {
- t.worker.flush()
- }
- }
- func (t *telemetryClient) getTelemetry() Telemetry {
- if t == nil {
- // telemetry was disabled through the WithoutTelemetry option
- return Telemetry{}
- }
- tlm := Telemetry{}
- t.c.flushTelemetryMetrics(&tlm)
- t.c.sender.flushTelemetryMetrics(&tlm)
- t.c.agg.flushTelemetryMetrics(&tlm)
- tlm.TotalMetrics = tlm.TotalMetricsGauge +
- tlm.TotalMetricsCount +
- tlm.TotalMetricsSet +
- tlm.TotalMetricsHistogram +
- tlm.TotalMetricsDistribution +
- tlm.TotalMetricsTiming
- tlm.TotalPayloadsDropped = tlm.TotalPayloadsDroppedQueueFull + tlm.TotalPayloadsDroppedWriter
- tlm.TotalBytesDropped = tlm.TotalBytesDroppedQueueFull + tlm.TotalBytesDroppedWriter
- if t.aggEnabled {
- tlm.AggregationNbContext = tlm.AggregationNbContextGauge +
- tlm.AggregationNbContextCount +
- tlm.AggregationNbContextSet +
- tlm.AggregationNbContextHistogram +
- tlm.AggregationNbContextDistribution +
- tlm.AggregationNbContextTiming
- }
- return tlm
- }
- // flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
- func (t *telemetryClient) flush() []metric {
- m := []metric{}
- // same as Count but without global namespace
- telemetryCount := func(name string, value int64, tags []string) {
- m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1})
- }
- tlm := t.getTelemetry()
- // We send the diff between now and the previous telemetry flush. This keep the same telemetry behavior from V4
- // so users dashboard's aren't broken when upgrading to V5. It also allow to graph on the same dashboard a mix
- // of V4 and V5 apps.
- telemetryCount("datadog.dogstatsd.client.metrics", int64(tlm.TotalMetrics-t.lastSample.TotalMetrics), t.tags)
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsGauge-t.lastSample.TotalMetricsGauge), t.tagsByType[gauge])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsCount-t.lastSample.TotalMetricsCount), t.tagsByType[count])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsHistogram-t.lastSample.TotalMetricsHistogram), t.tagsByType[histogram])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsDistribution-t.lastSample.TotalMetricsDistribution), t.tagsByType[distribution])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsSet-t.lastSample.TotalMetricsSet), t.tagsByType[set])
- telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(tlm.TotalMetricsTiming-t.lastSample.TotalMetricsTiming), t.tagsByType[timing])
- telemetryCount("datadog.dogstatsd.client.events", int64(tlm.TotalEvents-t.lastSample.TotalEvents), t.tags)
- telemetryCount("datadog.dogstatsd.client.service_checks", int64(tlm.TotalServiceChecks-t.lastSample.TotalServiceChecks), t.tags)
- telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(tlm.TotalDroppedOnReceive-t.lastSample.TotalDroppedOnReceive), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_sent", int64(tlm.TotalPayloadsSent-t.lastSample.TotalPayloadsSent), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(tlm.TotalPayloadsDropped-t.lastSample.TotalPayloadsDropped), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(tlm.TotalPayloadsDroppedQueueFull-t.lastSample.TotalPayloadsDroppedQueueFull), t.tags)
- telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(tlm.TotalPayloadsDroppedWriter-t.lastSample.TotalPayloadsDroppedWriter), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(tlm.TotalBytesDropped-t.lastSample.TotalBytesDropped), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(tlm.TotalBytesSent-t.lastSample.TotalBytesSent), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(tlm.TotalBytesDroppedQueueFull-t.lastSample.TotalBytesDroppedQueueFull), t.tags)
- telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(tlm.TotalBytesDroppedWriter-t.lastSample.TotalBytesDroppedWriter), t.tags)
- if t.aggEnabled {
- telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(tlm.AggregationNbContext-t.lastSample.AggregationNbContext), t.tags)
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextGauge-t.lastSample.AggregationNbContextGauge), t.tagsByType[gauge])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextSet-t.lastSample.AggregationNbContextSet), t.tagsByType[set])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextCount-t.lastSample.AggregationNbContextCount), t.tagsByType[count])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextHistogram-t.lastSample.AggregationNbContextHistogram), t.tagsByType[histogram])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextDistribution-t.lastSample.AggregationNbContextDistribution), t.tagsByType[distribution])
- telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(tlm.AggregationNbContextTiming-t.lastSample.AggregationNbContextTiming), t.tagsByType[timing])
- }
- t.lastSample = tlm
- return m
- }
|