package statsd import ( "fmt" "math" "strings" "time" ) var ( defaultNamespace = "" defaultTags = []string{} defaultMaxBytesPerPayload = 0 defaultMaxMessagesPerPayload = math.MaxInt32 defaultBufferPoolSize = 0 defaultBufferFlushInterval = 100 * time.Millisecond defaultWorkerCount = 32 defaultSenderQueueSize = 0 defaultWriteTimeout = 100 * time.Millisecond defaultTelemetry = true defaultReceivingMode = mutexMode defaultChannelModeBufferSize = 4096 defaultAggregationFlushInterval = 2 * time.Second defaultAggregation = true defaultExtendedAggregation = false ) // Options contains the configuration options for a client. type Options struct { namespace string tags []string maxBytesPerPayload int maxMessagesPerPayload int bufferPoolSize int bufferFlushInterval time.Duration workersCount int senderQueueSize int writeTimeout time.Duration telemetry bool receiveMode receivingMode channelModeBufferSize int aggregationFlushInterval time.Duration aggregation bool extendedAggregation bool telemetryAddr string } func resolveOptions(options []Option) (*Options, error) { o := &Options{ namespace: defaultNamespace, tags: defaultTags, maxBytesPerPayload: defaultMaxBytesPerPayload, maxMessagesPerPayload: defaultMaxMessagesPerPayload, bufferPoolSize: defaultBufferPoolSize, bufferFlushInterval: defaultBufferFlushInterval, workersCount: defaultWorkerCount, senderQueueSize: defaultSenderQueueSize, writeTimeout: defaultWriteTimeout, telemetry: defaultTelemetry, receiveMode: defaultReceivingMode, channelModeBufferSize: defaultChannelModeBufferSize, aggregationFlushInterval: defaultAggregationFlushInterval, aggregation: defaultAggregation, extendedAggregation: defaultExtendedAggregation, } for _, option := range options { err := option(o) if err != nil { return nil, err } } return o, nil } // Option is a client option. Can return an error if validation fails. type Option func(*Options) error // WithNamespace sets a string to be prepend to all metrics, events and service checks name. // // A '.' will automatically be added after the namespace if needed. For example a metrics 'test' with a namespace 'prod' // will produce a final metric named 'prod.test'. func WithNamespace(namespace string) Option { return func(o *Options) error { if strings.HasSuffix(namespace, ".") { o.namespace = namespace } else { o.namespace = namespace + "." } return nil } } // WithTags sets global tags to be applied to every metrics, events and service checks. func WithTags(tags []string) Option { return func(o *Options) error { o.tags = tags return nil } } // WithMaxMessagesPerPayload sets the maximum number of metrics, events and/or service checks that a single payload can // contain. // // The default is 'math.MaxInt32' which will most likely let the WithMaxBytesPerPayload option take precedence. This // option can be set to `1` to create an unbuffered client (each metrics/event/service check will be send in its own // payload to the agent). func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option { return func(o *Options) error { o.maxMessagesPerPayload = maxMessagesPerPayload return nil } } // WithMaxBytesPerPayload sets the maximum number of bytes a single payload can contain. // // The deault value 0 which will set the option to the optimal size for the transport protocol used: 1432 for UDP and // named pipe and 8192 for UDS. func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option { return func(o *Options) error { o.maxBytesPerPayload = MaxBytesPerPayload return nil } } // WithBufferPoolSize sets the size of the pool of buffers used to serialized metrics, events and service_checks. // // The default, 0, will set the option to the optimal size for the transport protocol used: 2048 for UDP and named pipe // and 512 for UDS. func WithBufferPoolSize(bufferPoolSize int) Option { return func(o *Options) error { o.bufferPoolSize = bufferPoolSize return nil } } // WithBufferFlushInterval sets the interval after which the current buffer is flushed. // // A buffers are used to serialized data, they're flushed either when full (see WithMaxBytesPerPayload) or when it's // been open for longer than this interval. // // With apps sending a high number of metrics/events/service_checks the interval rarely timeout. But with slow sending // apps increasing this value will reduce the number of payload sent on the wire as more data is serialized in the same // payload. // // Default is 100ms func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option { return func(o *Options) error { o.bufferFlushInterval = bufferFlushInterval return nil } } // WithWorkersCount sets the number of workers that will be used to serialized data. // // Those workers allow the use of multiple buffers at the same time (see WithBufferPoolSize) to reduce lock contention. // // Default is 32. func WithWorkersCount(workersCount int) Option { return func(o *Options) error { if workersCount < 1 { return fmt.Errorf("workersCount must be a positive integer") } o.workersCount = workersCount return nil } } // WithSenderQueueSize sets the size of the sender queue in number of buffers. // // After data has been serialized in a buffer they're pushed to a queue that the sender will consume and then each one // ot the agent. // // The default value 0 will set the option to the optimal size for the transport protocol used: 2048 for UDP and named // pipe and 512 for UDS. func WithSenderQueueSize(senderQueueSize int) Option { return func(o *Options) error { o.senderQueueSize = senderQueueSize return nil } } // WithWriteTimeout sets the timeout for network communication with the Agent, after this interval a payload is // dropped. This is only used for UDS and named pipes connection. func WithWriteTimeout(writeTimeout time.Duration) Option { return func(o *Options) error { o.writeTimeout = writeTimeout return nil } } // WithChannelMode make the client use channels to receive metrics // // This determines how the client receive metrics from the app (for example when calling the `Gauge()` method). // The client will either drop the metrics if its buffers are full (WithChannelMode option) or block the caller until the // metric can be handled (WithMutexMode option). By default the client use mutexes. // // WithChannelMode uses a channel (see WithChannelModeBufferSize to configure its size) to receive metrics and drops metrics if // the channel is full. Sending metrics in this mode is much slower that WithMutexMode (because of the channel), but will not // block the application. This mode is made for application using many goroutines, sending the same metrics, at a very // high volume. The goal is to not slow down the application at the cost of dropping metrics and having a lower max // throughput. func WithChannelMode() Option { return func(o *Options) error { o.receiveMode = channelMode return nil } } // WithMutexMode will use mutex to receive metrics from the app throught the API. // // This determines how the client receive metrics from the app (for example when calling the `Gauge()` method). // The client will either drop the metrics if its buffers are full (WithChannelMode option) or block the caller until the // metric can be handled (WithMutexMode option). By default the client use mutexes. // // WithMutexMode uses mutexes to receive metrics which is much faster than channels but can cause some lock contention // when used with a high number of goroutines sendint the same metrics. Mutexes are sharded based on the metrics name // which limit mutex contention when multiple goroutines send different metrics (see WithWorkersCount). This is the // default behavior which will produce the best throughput. func WithMutexMode() Option { return func(o *Options) error { o.receiveMode = mutexMode return nil } } // WithChannelModeBufferSize sets the size of the channel holding incoming metrics when WithChannelMode is used. func WithChannelModeBufferSize(bufferSize int) Option { return func(o *Options) error { o.channelModeBufferSize = bufferSize return nil } } // WithAggregationInterval sets the interval at which aggregated metrics are flushed. See WithClientSideAggregation and // WithExtendedClientSideAggregation for more. // // The default interval is 2s. The interval must divide the Agent reporting period (default=10s) evenly to reduce "aliasing" // that can cause values to appear irregular/spiky. // // For example a 3s aggregation interval will create spikes in the final graph: a application sending a count metric // that increments at a constant 1000 time per second will appear noisy with an interval of 3s. This is because // client-side aggregation would report every 3 seconds, while the agent is reporting every 10 seconds. This means in // each agent bucket, the values are: 9000, 9000, 12000. func WithAggregationInterval(interval time.Duration) Option { return func(o *Options) error { o.aggregationFlushInterval = interval return nil } } // WithClientSideAggregation enables client side aggregation for Gauges, Counts and Sets. func WithClientSideAggregation() Option { return func(o *Options) error { o.aggregation = true return nil } } // WithoutClientSideAggregation disables client side aggregation. func WithoutClientSideAggregation() Option { return func(o *Options) error { o.aggregation = false o.extendedAggregation = false return nil } } // WithExtendedClientSideAggregation enables client side aggregation for all types. This feature is only compatible with // Agent's version >=6.25.0 && <7.0.0 or Agent's versions >=7.25.0. func WithExtendedClientSideAggregation() Option { return func(o *Options) error { o.aggregation = true o.extendedAggregation = true return nil } } // WithoutTelemetry disables the client telemetry. // // More on this here: https://docs.datadoghq.com/developers/dogstatsd/high_throughput/#client-side-telemetry func WithoutTelemetry() Option { return func(o *Options) error { o.telemetry = false return nil } } // WithTelemetryAddr sets a different address for telemetry metrics. By default the same address as the client is used // for telemetry. // // More on this here: https://docs.datadoghq.com/developers/dogstatsd/high_throughput/#client-side-telemetry func WithTelemetryAddr(addr string) Option { return func(o *Options) error { o.telemetryAddr = addr return nil } }