options.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package statsd
  2. import (
  3. "fmt"
  4. "math"
  5. "strings"
  6. "time"
  7. )
  8. var (
  9. defaultNamespace = ""
  10. defaultTags = []string{}
  11. defaultMaxBytesPerPayload = 0
  12. defaultMaxMessagesPerPayload = math.MaxInt32
  13. defaultBufferPoolSize = 0
  14. defaultBufferFlushInterval = 100 * time.Millisecond
  15. defaultWorkerCount = 32
  16. defaultSenderQueueSize = 0
  17. defaultWriteTimeout = 100 * time.Millisecond
  18. defaultTelemetry = true
  19. defaultReceivingMode = mutexMode
  20. defaultChannelModeBufferSize = 4096
  21. defaultAggregationFlushInterval = 2 * time.Second
  22. defaultAggregation = true
  23. defaultExtendedAggregation = false
  24. )
  25. // Options contains the configuration options for a client.
  26. type Options struct {
  27. namespace string
  28. tags []string
  29. maxBytesPerPayload int
  30. maxMessagesPerPayload int
  31. bufferPoolSize int
  32. bufferFlushInterval time.Duration
  33. workersCount int
  34. senderQueueSize int
  35. writeTimeout time.Duration
  36. telemetry bool
  37. receiveMode receivingMode
  38. channelModeBufferSize int
  39. aggregationFlushInterval time.Duration
  40. aggregation bool
  41. extendedAggregation bool
  42. telemetryAddr string
  43. }
  44. func resolveOptions(options []Option) (*Options, error) {
  45. o := &Options{
  46. namespace: defaultNamespace,
  47. tags: defaultTags,
  48. maxBytesPerPayload: defaultMaxBytesPerPayload,
  49. maxMessagesPerPayload: defaultMaxMessagesPerPayload,
  50. bufferPoolSize: defaultBufferPoolSize,
  51. bufferFlushInterval: defaultBufferFlushInterval,
  52. workersCount: defaultWorkerCount,
  53. senderQueueSize: defaultSenderQueueSize,
  54. writeTimeout: defaultWriteTimeout,
  55. telemetry: defaultTelemetry,
  56. receiveMode: defaultReceivingMode,
  57. channelModeBufferSize: defaultChannelModeBufferSize,
  58. aggregationFlushInterval: defaultAggregationFlushInterval,
  59. aggregation: defaultAggregation,
  60. extendedAggregation: defaultExtendedAggregation,
  61. }
  62. for _, option := range options {
  63. err := option(o)
  64. if err != nil {
  65. return nil, err
  66. }
  67. }
  68. return o, nil
  69. }
  70. // Option is a client option. Can return an error if validation fails.
  71. type Option func(*Options) error
  72. // WithNamespace sets a string to be prepend to all metrics, events and service checks name.
  73. //
  74. // A '.' will automatically be added after the namespace if needed. For example a metrics 'test' with a namespace 'prod'
  75. // will produce a final metric named 'prod.test'.
  76. func WithNamespace(namespace string) Option {
  77. return func(o *Options) error {
  78. if strings.HasSuffix(namespace, ".") {
  79. o.namespace = namespace
  80. } else {
  81. o.namespace = namespace + "."
  82. }
  83. return nil
  84. }
  85. }
  86. // WithTags sets global tags to be applied to every metrics, events and service checks.
  87. func WithTags(tags []string) Option {
  88. return func(o *Options) error {
  89. o.tags = tags
  90. return nil
  91. }
  92. }
  93. // WithMaxMessagesPerPayload sets the maximum number of metrics, events and/or service checks that a single payload can
  94. // contain.
  95. //
  96. // The default is 'math.MaxInt32' which will most likely let the WithMaxBytesPerPayload option take precedence. This
  97. // option can be set to `1` to create an unbuffered client (each metrics/event/service check will be send in its own
  98. // payload to the agent).
  99. func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option {
  100. return func(o *Options) error {
  101. o.maxMessagesPerPayload = maxMessagesPerPayload
  102. return nil
  103. }
  104. }
  105. // WithMaxBytesPerPayload sets the maximum number of bytes a single payload can contain.
  106. //
  107. // The deault value 0 which will set the option to the optimal size for the transport protocol used: 1432 for UDP and
  108. // named pipe and 8192 for UDS.
  109. func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option {
  110. return func(o *Options) error {
  111. o.maxBytesPerPayload = MaxBytesPerPayload
  112. return nil
  113. }
  114. }
  115. // WithBufferPoolSize sets the size of the pool of buffers used to serialized metrics, events and service_checks.
  116. //
  117. // The default, 0, will set the option to the optimal size for the transport protocol used: 2048 for UDP and named pipe
  118. // and 512 for UDS.
  119. func WithBufferPoolSize(bufferPoolSize int) Option {
  120. return func(o *Options) error {
  121. o.bufferPoolSize = bufferPoolSize
  122. return nil
  123. }
  124. }
  125. // WithBufferFlushInterval sets the interval after which the current buffer is flushed.
  126. //
  127. // A buffers are used to serialized data, they're flushed either when full (see WithMaxBytesPerPayload) or when it's
  128. // been open for longer than this interval.
  129. //
  130. // With apps sending a high number of metrics/events/service_checks the interval rarely timeout. But with slow sending
  131. // apps increasing this value will reduce the number of payload sent on the wire as more data is serialized in the same
  132. // payload.
  133. //
  134. // Default is 100ms
  135. func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
  136. return func(o *Options) error {
  137. o.bufferFlushInterval = bufferFlushInterval
  138. return nil
  139. }
  140. }
  141. // WithWorkersCount sets the number of workers that will be used to serialized data.
  142. //
  143. // Those workers allow the use of multiple buffers at the same time (see WithBufferPoolSize) to reduce lock contention.
  144. //
  145. // Default is 32.
  146. func WithWorkersCount(workersCount int) Option {
  147. return func(o *Options) error {
  148. if workersCount < 1 {
  149. return fmt.Errorf("workersCount must be a positive integer")
  150. }
  151. o.workersCount = workersCount
  152. return nil
  153. }
  154. }
  155. // WithSenderQueueSize sets the size of the sender queue in number of buffers.
  156. //
  157. // After data has been serialized in a buffer they're pushed to a queue that the sender will consume and then each one
  158. // ot the agent.
  159. //
  160. // The default value 0 will set the option to the optimal size for the transport protocol used: 2048 for UDP and named
  161. // pipe and 512 for UDS.
  162. func WithSenderQueueSize(senderQueueSize int) Option {
  163. return func(o *Options) error {
  164. o.senderQueueSize = senderQueueSize
  165. return nil
  166. }
  167. }
  168. // WithWriteTimeout sets the timeout for network communication with the Agent, after this interval a payload is
  169. // dropped. This is only used for UDS and named pipes connection.
  170. func WithWriteTimeout(writeTimeout time.Duration) Option {
  171. return func(o *Options) error {
  172. o.writeTimeout = writeTimeout
  173. return nil
  174. }
  175. }
  176. // WithChannelMode make the client use channels to receive metrics
  177. //
  178. // This determines how the client receive metrics from the app (for example when calling the `Gauge()` method).
  179. // The client will either drop the metrics if its buffers are full (WithChannelMode option) or block the caller until the
  180. // metric can be handled (WithMutexMode option). By default the client use mutexes.
  181. //
  182. // WithChannelMode uses a channel (see WithChannelModeBufferSize to configure its size) to receive metrics and drops metrics if
  183. // the channel is full. Sending metrics in this mode is much slower that WithMutexMode (because of the channel), but will not
  184. // block the application. This mode is made for application using many goroutines, sending the same metrics, at a very
  185. // high volume. The goal is to not slow down the application at the cost of dropping metrics and having a lower max
  186. // throughput.
  187. func WithChannelMode() Option {
  188. return func(o *Options) error {
  189. o.receiveMode = channelMode
  190. return nil
  191. }
  192. }
  193. // WithMutexMode will use mutex to receive metrics from the app throught the API.
  194. //
  195. // This determines how the client receive metrics from the app (for example when calling the `Gauge()` method).
  196. // The client will either drop the metrics if its buffers are full (WithChannelMode option) or block the caller until the
  197. // metric can be handled (WithMutexMode option). By default the client use mutexes.
  198. //
  199. // WithMutexMode uses mutexes to receive metrics which is much faster than channels but can cause some lock contention
  200. // when used with a high number of goroutines sendint the same metrics. Mutexes are sharded based on the metrics name
  201. // which limit mutex contention when multiple goroutines send different metrics (see WithWorkersCount). This is the
  202. // default behavior which will produce the best throughput.
  203. func WithMutexMode() Option {
  204. return func(o *Options) error {
  205. o.receiveMode = mutexMode
  206. return nil
  207. }
  208. }
  209. // WithChannelModeBufferSize sets the size of the channel holding incoming metrics when WithChannelMode is used.
  210. func WithChannelModeBufferSize(bufferSize int) Option {
  211. return func(o *Options) error {
  212. o.channelModeBufferSize = bufferSize
  213. return nil
  214. }
  215. }
  216. // WithAggregationInterval sets the interval at which aggregated metrics are flushed. See WithClientSideAggregation and
  217. // WithExtendedClientSideAggregation for more.
  218. //
  219. // The default interval is 2s. The interval must divide the Agent reporting period (default=10s) evenly to reduce "aliasing"
  220. // that can cause values to appear irregular/spiky.
  221. //
  222. // For example a 3s aggregation interval will create spikes in the final graph: a application sending a count metric
  223. // that increments at a constant 1000 time per second will appear noisy with an interval of 3s. This is because
  224. // client-side aggregation would report every 3 seconds, while the agent is reporting every 10 seconds. This means in
  225. // each agent bucket, the values are: 9000, 9000, 12000.
  226. func WithAggregationInterval(interval time.Duration) Option {
  227. return func(o *Options) error {
  228. o.aggregationFlushInterval = interval
  229. return nil
  230. }
  231. }
  232. // WithClientSideAggregation enables client side aggregation for Gauges, Counts and Sets.
  233. func WithClientSideAggregation() Option {
  234. return func(o *Options) error {
  235. o.aggregation = true
  236. return nil
  237. }
  238. }
  239. // WithoutClientSideAggregation disables client side aggregation.
  240. func WithoutClientSideAggregation() Option {
  241. return func(o *Options) error {
  242. o.aggregation = false
  243. o.extendedAggregation = false
  244. return nil
  245. }
  246. }
  247. // WithExtendedClientSideAggregation enables client side aggregation for all types. This feature is only compatible with
  248. // Agent's version >=6.25.0 && <7.0.0 or Agent's versions >=7.25.0.
  249. func WithExtendedClientSideAggregation() Option {
  250. return func(o *Options) error {
  251. o.aggregation = true
  252. o.extendedAggregation = true
  253. return nil
  254. }
  255. }
  256. // WithoutTelemetry disables the client telemetry.
  257. //
  258. // More on this here: https://docs.datadoghq.com/developers/dogstatsd/high_throughput/#client-side-telemetry
  259. func WithoutTelemetry() Option {
  260. return func(o *Options) error {
  261. o.telemetry = false
  262. return nil
  263. }
  264. }
  265. // WithTelemetryAddr sets a different address for telemetry metrics. By default the same address as the client is used
  266. // for telemetry.
  267. //
  268. // More on this here: https://docs.datadoghq.com/developers/dogstatsd/high_throughput/#client-side-telemetry
  269. func WithTelemetryAddr(addr string) Option {
  270. return func(o *Options) error {
  271. o.telemetryAddr = addr
  272. return nil
  273. }
  274. }