options.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package statsd
  2. import (
  3. "fmt"
  4. "math"
  5. "strings"
  6. "time"
  7. )
  8. var (
  9. // DefaultNamespace is the default value for the Namespace option
  10. DefaultNamespace = ""
  11. // DefaultTags is the default value for the Tags option
  12. DefaultTags = []string{}
  13. // DefaultMaxBytesPerPayload is the default value for the MaxBytesPerPayload option
  14. DefaultMaxBytesPerPayload = 0
  15. // DefaultMaxMessagesPerPayload is the default value for the MaxMessagesPerPayload option
  16. DefaultMaxMessagesPerPayload = math.MaxInt32
  17. // DefaultBufferPoolSize is the default value for the DefaultBufferPoolSize option
  18. DefaultBufferPoolSize = 0
  19. // DefaultBufferFlushInterval is the default value for the BufferFlushInterval option
  20. DefaultBufferFlushInterval = 100 * time.Millisecond
  21. // DefaultBufferShardCount is the default value for the BufferShardCount option
  22. DefaultBufferShardCount = 32
  23. // DefaultSenderQueueSize is the default value for the DefaultSenderQueueSize option
  24. DefaultSenderQueueSize = 0
  25. // DefaultWriteTimeoutUDS is the default value for the WriteTimeoutUDS option
  26. DefaultWriteTimeoutUDS = 100 * time.Millisecond
  27. // DefaultTelemetry is the default value for the Telemetry option
  28. DefaultTelemetry = true
  29. // DefaultReceivingMode is the default behavior when sending metrics
  30. DefaultReceivingMode = MutexMode
  31. // DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics
  32. DefaultChannelModeBufferSize = 4096
  33. // DefaultAggregationFlushInterval is the default interval for the aggregator to flush metrics.
  34. // This should divide the Agent reporting period (default=10s) evenly to reduce "aliasing" that
  35. // can cause values to appear irregular.
  36. DefaultAggregationFlushInterval = 2 * time.Second
  37. // DefaultAggregation
  38. DefaultAggregation = false
  39. // DefaultExtendedAggregation
  40. DefaultExtendedAggregation = false
  41. // DefaultDevMode
  42. DefaultDevMode = false
  43. )
  44. // Options contains the configuration options for a client.
  45. type Options struct {
  46. // Namespace to prepend to all metrics, events and service checks name.
  47. Namespace string
  48. // Tags are global tags to be applied to every metrics, events and service checks.
  49. Tags []string
  50. // MaxBytesPerPayload is the maximum number of bytes a single payload will contain.
  51. // The magic value 0 will set the option to the optimal size for the transport
  52. // protocol used when creating the client: 1432 for UDP and 8192 for UDS.
  53. MaxBytesPerPayload int
  54. // MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain.
  55. // This option can be set to `1` to create an unbuffered client.
  56. MaxMessagesPerPayload int
  57. // BufferPoolSize is the size of the pool of buffers in number of buffers.
  58. // The magic value 0 will set the option to the optimal size for the transport
  59. // protocol used when creating the client: 2048 for UDP and 512 for UDS.
  60. BufferPoolSize int
  61. // BufferFlushInterval is the interval after which the current buffer will get flushed.
  62. BufferFlushInterval time.Duration
  63. // BufferShardCount is the number of buffer "shards" that will be used.
  64. // Those shards allows the use of multiple buffers at the same time to reduce
  65. // lock contention.
  66. BufferShardCount int
  67. // SenderQueueSize is the size of the sender queue in number of buffers.
  68. // The magic value 0 will set the option to the optimal size for the transport
  69. // protocol used when creating the client: 2048 for UDP and 512 for UDS.
  70. SenderQueueSize int
  71. // WriteTimeoutUDS is the timeout after which a UDS packet is dropped.
  72. WriteTimeoutUDS time.Duration
  73. // Telemetry is a set of metrics automatically injected by the client in the
  74. // dogstatsd stream to be able to monitor the client itself.
  75. Telemetry bool
  76. // ReceiveMode determins the behavior of the client when receiving to many
  77. // metrics. The client will either drop the metrics if its buffers are
  78. // full (ChannelMode mode) or block the caller until the metric can be
  79. // handled (MutexMode mode). By default the client will MutexMode. This
  80. // option should be set to ChannelMode only when use under very high
  81. // load.
  82. //
  83. // MutexMode uses a mutex internally which is much faster than
  84. // channel but causes some lock contention when used with a high number
  85. // of threads. Mutex are sharded based on the metrics name which
  86. // limit mutex contention when goroutines send different metrics.
  87. //
  88. // ChannelMode: uses channel (of ChannelModeBufferSize size) to send
  89. // metrics and drop metrics if the channel is full. Sending metrics in
  90. // this mode is slower that MutexMode (because of the channel), but
  91. // will not block the application. This mode is made for application
  92. // using many goroutines, sending the same metrics at a very high
  93. // volume. The goal is to not slow down the application at the cost of
  94. // dropping metrics and having a lower max throughput.
  95. ReceiveMode ReceivingMode
  96. // ChannelModeBufferSize is the size of the channel holding incoming metrics
  97. ChannelModeBufferSize int
  98. // AggregationFlushInterval is the interval for the aggregator to flush metrics
  99. AggregationFlushInterval time.Duration
  100. // [beta] Aggregation enables/disables client side aggregation for
  101. // Gauges, Counts and Sets (compatible with every Agent's version).
  102. Aggregation bool
  103. // [beta] Extended aggregation enables/disables client side aggregation
  104. // for all types. This feature is only compatible with Agent's versions
  105. // >=7.25.0 or Agent's version >=6.25.0 && < 7.0.0.
  106. ExtendedAggregation bool
  107. // TelemetryAddr specify a different endpoint for telemetry metrics.
  108. TelemetryAddr string
  109. // DevMode enables the "dev" mode where the client sends much more
  110. // telemetry metrics to help troubleshooting the client behavior.
  111. DevMode bool
  112. }
  113. func resolveOptions(options []Option) (*Options, error) {
  114. o := &Options{
  115. Namespace: DefaultNamespace,
  116. Tags: DefaultTags,
  117. MaxBytesPerPayload: DefaultMaxBytesPerPayload,
  118. MaxMessagesPerPayload: DefaultMaxMessagesPerPayload,
  119. BufferPoolSize: DefaultBufferPoolSize,
  120. BufferFlushInterval: DefaultBufferFlushInterval,
  121. BufferShardCount: DefaultBufferShardCount,
  122. SenderQueueSize: DefaultSenderQueueSize,
  123. WriteTimeoutUDS: DefaultWriteTimeoutUDS,
  124. Telemetry: DefaultTelemetry,
  125. ReceiveMode: DefaultReceivingMode,
  126. ChannelModeBufferSize: DefaultChannelModeBufferSize,
  127. AggregationFlushInterval: DefaultAggregationFlushInterval,
  128. Aggregation: DefaultAggregation,
  129. ExtendedAggregation: DefaultExtendedAggregation,
  130. DevMode: DefaultDevMode,
  131. }
  132. for _, option := range options {
  133. err := option(o)
  134. if err != nil {
  135. return nil, err
  136. }
  137. }
  138. return o, nil
  139. }
  140. // Option is a client option. Can return an error if validation fails.
  141. type Option func(*Options) error
  142. // WithNamespace sets the Namespace option.
  143. func WithNamespace(namespace string) Option {
  144. return func(o *Options) error {
  145. if strings.HasSuffix(namespace, ".") {
  146. o.Namespace = namespace
  147. } else {
  148. o.Namespace = namespace + "."
  149. }
  150. return nil
  151. }
  152. }
  153. // WithTags sets the Tags option.
  154. func WithTags(tags []string) Option {
  155. return func(o *Options) error {
  156. o.Tags = tags
  157. return nil
  158. }
  159. }
  160. // WithMaxMessagesPerPayload sets the MaxMessagesPerPayload option.
  161. func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option {
  162. return func(o *Options) error {
  163. o.MaxMessagesPerPayload = maxMessagesPerPayload
  164. return nil
  165. }
  166. }
  167. // WithMaxBytesPerPayload sets the MaxBytesPerPayload option.
  168. func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option {
  169. return func(o *Options) error {
  170. o.MaxBytesPerPayload = MaxBytesPerPayload
  171. return nil
  172. }
  173. }
  174. // WithBufferPoolSize sets the BufferPoolSize option.
  175. func WithBufferPoolSize(bufferPoolSize int) Option {
  176. return func(o *Options) error {
  177. o.BufferPoolSize = bufferPoolSize
  178. return nil
  179. }
  180. }
  181. // WithBufferFlushInterval sets the BufferFlushInterval option.
  182. func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
  183. return func(o *Options) error {
  184. o.BufferFlushInterval = bufferFlushInterval
  185. return nil
  186. }
  187. }
  188. // WithBufferShardCount sets the BufferShardCount option.
  189. func WithBufferShardCount(bufferShardCount int) Option {
  190. return func(o *Options) error {
  191. if bufferShardCount < 1 {
  192. return fmt.Errorf("BufferShardCount must be a positive integer")
  193. }
  194. o.BufferShardCount = bufferShardCount
  195. return nil
  196. }
  197. }
  198. // WithSenderQueueSize sets the SenderQueueSize option.
  199. func WithSenderQueueSize(senderQueueSize int) Option {
  200. return func(o *Options) error {
  201. o.SenderQueueSize = senderQueueSize
  202. return nil
  203. }
  204. }
  205. // WithWriteTimeoutUDS sets the WriteTimeoutUDS option.
  206. func WithWriteTimeoutUDS(writeTimeoutUDS time.Duration) Option {
  207. return func(o *Options) error {
  208. o.WriteTimeoutUDS = writeTimeoutUDS
  209. return nil
  210. }
  211. }
  212. // WithoutTelemetry disables the telemetry
  213. func WithoutTelemetry() Option {
  214. return func(o *Options) error {
  215. o.Telemetry = false
  216. return nil
  217. }
  218. }
  219. // WithChannelMode will use channel to receive metrics
  220. func WithChannelMode() Option {
  221. return func(o *Options) error {
  222. o.ReceiveMode = ChannelMode
  223. return nil
  224. }
  225. }
  226. // WithMutexMode will use mutex to receive metrics
  227. func WithMutexMode() Option {
  228. return func(o *Options) error {
  229. o.ReceiveMode = MutexMode
  230. return nil
  231. }
  232. }
  233. // WithChannelModeBufferSize the channel buffer size when using "drop mode"
  234. func WithChannelModeBufferSize(bufferSize int) Option {
  235. return func(o *Options) error {
  236. o.ChannelModeBufferSize = bufferSize
  237. return nil
  238. }
  239. }
  240. // WithAggregationInterval set the aggregation interval
  241. func WithAggregationInterval(interval time.Duration) Option {
  242. return func(o *Options) error {
  243. o.AggregationFlushInterval = interval
  244. return nil
  245. }
  246. }
  247. // WithClientSideAggregation enables client side aggregation for Gauges, Counts
  248. // and Sets. Client side aggregation is a beta feature.
  249. func WithClientSideAggregation() Option {
  250. return func(o *Options) error {
  251. o.Aggregation = true
  252. return nil
  253. }
  254. }
  255. // WithoutClientSideAggregation disables client side aggregation.
  256. func WithoutClientSideAggregation() Option {
  257. return func(o *Options) error {
  258. o.Aggregation = false
  259. o.ExtendedAggregation = false
  260. return nil
  261. }
  262. }
  263. // WithExtendedClientSideAggregation enables client side aggregation for all
  264. // types. This feature is only compatible with Agent's version >=6.25.0 &&
  265. // <7.0.0 or Agent's versions >=7.25.0. Client side aggregation is a beta
  266. // feature.
  267. func WithExtendedClientSideAggregation() Option {
  268. return func(o *Options) error {
  269. o.Aggregation = true
  270. o.ExtendedAggregation = true
  271. return nil
  272. }
  273. }
  274. // WithTelemetryAddr specify a different address for telemetry metrics.
  275. func WithTelemetryAddr(addr string) Option {
  276. return func(o *Options) error {
  277. o.TelemetryAddr = addr
  278. return nil
  279. }
  280. }
  281. // WithDevMode enables client "dev" mode, sending more Telemetry metrics to
  282. // help troubleshoot client behavior.
  283. func WithDevMode() Option {
  284. return func(o *Options) error {
  285. o.DevMode = true
  286. return nil
  287. }
  288. }
  289. // WithoutDevMode disables client "dev" mode, sending more Telemetry metrics to
  290. // help troubleshoot client behavior.
  291. func WithoutDevMode() Option {
  292. return func(o *Options) error {
  293. o.DevMode = false
  294. return nil
  295. }
  296. }