| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- package statsd
- import (
- "fmt"
- "math"
- "strings"
- "time"
- )
- var (
- defaultNamespace = ""
- defaultTags = []string{}
- defaultMaxBytesPerPayload = 0
- defaultMaxMessagesPerPayload = math.MaxInt32
- defaultBufferPoolSize = 0
- defaultBufferFlushInterval = 100 * time.Millisecond
- defaultWorkerCount = 32
- defaultSenderQueueSize = 0
- defaultWriteTimeout = 100 * time.Millisecond
- defaultTelemetry = true
- defaultReceivingMode = mutexMode
- defaultChannelModeBufferSize = 4096
- defaultAggregationFlushInterval = 2 * time.Second
- defaultAggregation = true
- defaultExtendedAggregation = false
- )
- // Options contains the configuration options for a client.
- type Options struct {
- namespace string
- tags []string
- maxBytesPerPayload int
- maxMessagesPerPayload int
- bufferPoolSize int
- bufferFlushInterval time.Duration
- workersCount int
- senderQueueSize int
- writeTimeout time.Duration
- telemetry bool
- receiveMode receivingMode
- channelModeBufferSize int
- aggregationFlushInterval time.Duration
- aggregation bool
- extendedAggregation bool
- telemetryAddr string
- }
- func resolveOptions(options []Option) (*Options, error) {
- o := &Options{
- namespace: defaultNamespace,
- tags: defaultTags,
- maxBytesPerPayload: defaultMaxBytesPerPayload,
- maxMessagesPerPayload: defaultMaxMessagesPerPayload,
- bufferPoolSize: defaultBufferPoolSize,
- bufferFlushInterval: defaultBufferFlushInterval,
- workersCount: defaultWorkerCount,
- senderQueueSize: defaultSenderQueueSize,
- writeTimeout: defaultWriteTimeout,
- telemetry: defaultTelemetry,
- receiveMode: defaultReceivingMode,
- channelModeBufferSize: defaultChannelModeBufferSize,
- aggregationFlushInterval: defaultAggregationFlushInterval,
- aggregation: defaultAggregation,
- extendedAggregation: defaultExtendedAggregation,
- }
- for _, option := range options {
- err := option(o)
- if err != nil {
- return nil, err
- }
- }
- return o, nil
- }
- // Option is a client option. Can return an error if validation fails.
- type Option func(*Options) error
- // WithNamespace sets a string to be prepend to all metrics, events and service checks name.
- //
- // A '.' will automatically be added after the namespace if needed. For example a metrics 'test' with a namespace 'prod'
- // will produce a final metric named 'prod.test'.
- func WithNamespace(namespace string) Option {
- return func(o *Options) error {
- if strings.HasSuffix(namespace, ".") {
- o.namespace = namespace
- } else {
- o.namespace = namespace + "."
- }
- return nil
- }
- }
- // WithTags sets global tags to be applied to every metrics, events and service checks.
- func WithTags(tags []string) Option {
- return func(o *Options) error {
- o.tags = tags
- return nil
- }
- }
- // WithMaxMessagesPerPayload sets the maximum number of metrics, events and/or service checks that a single payload can
- // contain.
- //
- // The default is 'math.MaxInt32' which will most likely let the WithMaxBytesPerPayload option take precedence. This
- // option can be set to `1` to create an unbuffered client (each metrics/event/service check will be send in its own
- // payload to the agent).
- func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option {
- return func(o *Options) error {
- o.maxMessagesPerPayload = maxMessagesPerPayload
- return nil
- }
- }
- // WithMaxBytesPerPayload sets the maximum number of bytes a single payload can contain.
- //
- // The deault value 0 which will set the option to the optimal size for the transport protocol used: 1432 for UDP and
- // named pipe and 8192 for UDS.
- func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option {
- return func(o *Options) error {
- o.maxBytesPerPayload = MaxBytesPerPayload
- return nil
- }
- }
- // WithBufferPoolSize sets the size of the pool of buffers used to serialized metrics, events and service_checks.
- //
- // The default, 0, will set the option to the optimal size for the transport protocol used: 2048 for UDP and named pipe
- // and 512 for UDS.
- func WithBufferPoolSize(bufferPoolSize int) Option {
- return func(o *Options) error {
- o.bufferPoolSize = bufferPoolSize
- return nil
- }
- }
- // WithBufferFlushInterval sets the interval after which the current buffer is flushed.
- //
- // A buffers are used to serialized data, they're flushed either when full (see WithMaxBytesPerPayload) or when it's
- // been open for longer than this interval.
- //
- // With apps sending a high number of metrics/events/service_checks the interval rarely timeout. But with slow sending
- // apps increasing this value will reduce the number of payload sent on the wire as more data is serialized in the same
- // payload.
- //
- // Default is 100ms
- func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
- return func(o *Options) error {
- o.bufferFlushInterval = bufferFlushInterval
- return nil
- }
- }
- // WithWorkersCount sets the number of workers that will be used to serialized data.
- //
- // Those workers allow the use of multiple buffers at the same time (see WithBufferPoolSize) to reduce lock contention.
- //
- // Default is 32.
- func WithWorkersCount(workersCount int) Option {
- return func(o *Options) error {
- if workersCount < 1 {
- return fmt.Errorf("workersCount must be a positive integer")
- }
- o.workersCount = workersCount
- return nil
- }
- }
- // WithSenderQueueSize sets the size of the sender queue in number of buffers.
- //
- // After data has been serialized in a buffer they're pushed to a queue that the sender will consume and then each one
- // ot the agent.
- //
- // The default value 0 will set the option to the optimal size for the transport protocol used: 2048 for UDP and named
- // pipe and 512 for UDS.
- func WithSenderQueueSize(senderQueueSize int) Option {
- return func(o *Options) error {
- o.senderQueueSize = senderQueueSize
- return nil
- }
- }
- // WithWriteTimeout sets the timeout for network communication with the Agent, after this interval a payload is
- // dropped. This is only used for UDS and named pipes connection.
- func WithWriteTimeout(writeTimeout time.Duration) Option {
- return func(o *Options) error {
- o.writeTimeout = writeTimeout
- return nil
- }
- }
- // WithChannelMode make the client use channels to receive metrics
- //
- // This determines how the client receive metrics from the app (for example when calling the `Gauge()` method).
- // The client will either drop the metrics if its buffers are full (WithChannelMode option) or block the caller until the
- // metric can be handled (WithMutexMode option). By default the client use mutexes.
- //
- // WithChannelMode uses a channel (see WithChannelModeBufferSize to configure its size) to receive metrics and drops metrics if
- // the channel is full. Sending metrics in this mode is much slower that WithMutexMode (because of the channel), but will not
- // block the application. This mode is made for application using many goroutines, sending the same metrics, at a very
- // high volume. The goal is to not slow down the application at the cost of dropping metrics and having a lower max
- // throughput.
- func WithChannelMode() Option {
- return func(o *Options) error {
- o.receiveMode = channelMode
- return nil
- }
- }
- // WithMutexMode will use mutex to receive metrics from the app throught the API.
- //
- // This determines how the client receive metrics from the app (for example when calling the `Gauge()` method).
- // The client will either drop the metrics if its buffers are full (WithChannelMode option) or block the caller until the
- // metric can be handled (WithMutexMode option). By default the client use mutexes.
- //
- // WithMutexMode uses mutexes to receive metrics which is much faster than channels but can cause some lock contention
- // when used with a high number of goroutines sendint the same metrics. Mutexes are sharded based on the metrics name
- // which limit mutex contention when multiple goroutines send different metrics (see WithWorkersCount). This is the
- // default behavior which will produce the best throughput.
- func WithMutexMode() Option {
- return func(o *Options) error {
- o.receiveMode = mutexMode
- return nil
- }
- }
- // WithChannelModeBufferSize sets the size of the channel holding incoming metrics when WithChannelMode is used.
- func WithChannelModeBufferSize(bufferSize int) Option {
- return func(o *Options) error {
- o.channelModeBufferSize = bufferSize
- return nil
- }
- }
- // WithAggregationInterval sets the interval at which aggregated metrics are flushed. See WithClientSideAggregation and
- // WithExtendedClientSideAggregation for more.
- //
- // The default interval is 2s. The interval must divide the Agent reporting period (default=10s) evenly to reduce "aliasing"
- // that can cause values to appear irregular/spiky.
- //
- // For example a 3s aggregation interval will create spikes in the final graph: a application sending a count metric
- // that increments at a constant 1000 time per second will appear noisy with an interval of 3s. This is because
- // client-side aggregation would report every 3 seconds, while the agent is reporting every 10 seconds. This means in
- // each agent bucket, the values are: 9000, 9000, 12000.
- func WithAggregationInterval(interval time.Duration) Option {
- return func(o *Options) error {
- o.aggregationFlushInterval = interval
- return nil
- }
- }
- // WithClientSideAggregation enables client side aggregation for Gauges, Counts and Sets.
- func WithClientSideAggregation() Option {
- return func(o *Options) error {
- o.aggregation = true
- return nil
- }
- }
- // WithoutClientSideAggregation disables client side aggregation.
- func WithoutClientSideAggregation() Option {
- return func(o *Options) error {
- o.aggregation = false
- o.extendedAggregation = false
- return nil
- }
- }
- // WithExtendedClientSideAggregation enables client side aggregation for all types. This feature is only compatible with
- // Agent's version >=6.25.0 && <7.0.0 or Agent's versions >=7.25.0.
- func WithExtendedClientSideAggregation() Option {
- return func(o *Options) error {
- o.aggregation = true
- o.extendedAggregation = true
- return nil
- }
- }
- // WithoutTelemetry disables the client telemetry.
- //
- // More on this here: https://docs.datadoghq.com/developers/dogstatsd/high_throughput/#client-side-telemetry
- func WithoutTelemetry() Option {
- return func(o *Options) error {
- o.telemetry = false
- return nil
- }
- }
- // WithTelemetryAddr sets a different address for telemetry metrics. By default the same address as the client is used
- // for telemetry.
- //
- // More on this here: https://docs.datadoghq.com/developers/dogstatsd/high_throughput/#client-side-telemetry
- func WithTelemetryAddr(addr string) Option {
- return func(o *Options) error {
- o.telemetryAddr = addr
- return nil
- }
- }
|