statsd.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. // Copyright 2013 Ooyala, Inc.
  2. /*
  3. Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd,
  4. adding tags and histograms and pushing upstream to Datadog.
  5. Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD.
  6. statsd is based on go-statsd-client.
  7. */
  8. package statsd
  9. import (
  10. "errors"
  11. "fmt"
  12. "io"
  13. "os"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. )
  19. /*
  20. OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes
  21. is optimal for regular networks with an MTU of 1500 so datagrams don't get
  22. fragmented. It's generally recommended not to fragment UDP datagrams as losing
  23. a single fragment will cause the entire datagram to be lost.
  24. */
  25. const OptimalUDPPayloadSize = 1432
  26. /*
  27. MaxUDPPayloadSize defines the maximum payload size for a UDP datagram.
  28. Its value comes from the calculation: 65535 bytes Max UDP datagram size -
  29. 8byte UDP header - 60byte max IP headers
  30. any number greater than that will see frames being cut out.
  31. */
  32. const MaxUDPPayloadSize = 65467
  33. // DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients.
  34. const DefaultUDPBufferPoolSize = 2048
  35. // DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients.
  36. const DefaultUDSBufferPoolSize = 512
  37. /*
  38. DefaultMaxAgentPayloadSize is the default maximum payload size the agent
  39. can receive. This can be adjusted by changing dogstatsd_buffer_size in the
  40. agent configuration file datadog.yaml. This is also used as the optimal payload size
  41. for UDS datagrams.
  42. */
  43. const DefaultMaxAgentPayloadSize = 8192
  44. /*
  45. UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket
  46. traffic instead of UDP.
  47. */
  48. const UnixAddressPrefix = "unix://"
  49. /*
  50. WindowsPipeAddressPrefix holds the prefix to use to enable Windows Named Pipes
  51. traffic instead of UDP.
  52. */
  53. const WindowsPipeAddressPrefix = `\\.\pipe\`
  54. const (
  55. agentHostEnvVarName = "DD_AGENT_HOST"
  56. agentPortEnvVarName = "DD_DOGSTATSD_PORT"
  57. defaultUDPPort = "8125"
  58. )
  59. /*
  60. ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
  61. to a specific tag name. We use a slice to keep the order and simplify tests.
  62. */
  63. var ddEnvTagsMapping = []struct{ envName, tagName string }{
  64. {"DD_ENTITY_ID", "dd.internal.entity_id"}, // Client-side entity ID injection for container tagging.
  65. {"DD_ENV", "env"}, // The name of the env in which the service runs.
  66. {"DD_SERVICE", "service"}, // The name of the running service.
  67. {"DD_VERSION", "version"}, // The current version of the running service.
  68. }
  69. type metricType int
  70. const (
  71. gauge metricType = iota
  72. count
  73. histogram
  74. histogramAggregated
  75. distribution
  76. distributionAggregated
  77. set
  78. timing
  79. timingAggregated
  80. event
  81. serviceCheck
  82. )
  83. type receivingMode int
  84. const (
  85. mutexMode receivingMode = iota
  86. channelMode
  87. )
  88. const (
  89. writerNameUDP string = "udp"
  90. writerNameUDS string = "uds"
  91. writerWindowsPipe string = "pipe"
  92. )
  93. type metric struct {
  94. metricType metricType
  95. namespace string
  96. globalTags []string
  97. name string
  98. fvalue float64
  99. fvalues []float64
  100. ivalue int64
  101. svalue string
  102. evalue *Event
  103. scvalue *ServiceCheck
  104. tags []string
  105. stags string
  106. rate float64
  107. }
  108. type noClientErr string
  109. // ErrNoClient is returned if statsd reporting methods are invoked on
  110. // a nil client.
  111. const ErrNoClient = noClientErr("statsd client is nil")
  112. func (e noClientErr) Error() string {
  113. return string(e)
  114. }
  115. // ClientInterface is an interface that exposes the common client functions for the
  116. // purpose of being able to provide a no-op client or even mocking. This can aid
  117. // downstream users' with their testing.
  118. type ClientInterface interface {
  119. // Gauge measures the value of a metric at a particular time.
  120. Gauge(name string, value float64, tags []string, rate float64) error
  121. // Count tracks how many times something happened per second.
  122. Count(name string, value int64, tags []string, rate float64) error
  123. // Histogram tracks the statistical distribution of a set of values on each host.
  124. Histogram(name string, value float64, tags []string, rate float64) error
  125. // Distribution tracks the statistical distribution of a set of values across your infrastructure.
  126. Distribution(name string, value float64, tags []string, rate float64) error
  127. // Decr is just Count of -1
  128. Decr(name string, tags []string, rate float64) error
  129. // Incr is just Count of 1
  130. Incr(name string, tags []string, rate float64) error
  131. // Set counts the number of unique elements in a group.
  132. Set(name string, value string, tags []string, rate float64) error
  133. // Timing sends timing information, it is an alias for TimeInMilliseconds
  134. Timing(name string, value time.Duration, tags []string, rate float64) error
  135. // TimeInMilliseconds sends timing information in milliseconds.
  136. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
  137. TimeInMilliseconds(name string, value float64, tags []string, rate float64) error
  138. // Event sends the provided Event.
  139. Event(e *Event) error
  140. // SimpleEvent sends an event with the provided title and text.
  141. SimpleEvent(title, text string) error
  142. // ServiceCheck sends the provided ServiceCheck.
  143. ServiceCheck(sc *ServiceCheck) error
  144. // SimpleServiceCheck sends an serviceCheck with the provided name and status.
  145. SimpleServiceCheck(name string, status ServiceCheckStatus) error
  146. // Close the client connection.
  147. Close() error
  148. // Flush forces a flush of all the queued dogstatsd payloads.
  149. Flush() error
  150. }
  151. // A Client is a handle for sending messages to dogstatsd. It is safe to
  152. // use one Client from multiple goroutines simultaneously.
  153. type Client struct {
  154. // Sender handles the underlying networking protocol
  155. sender *sender
  156. // namespace to prepend to all statsd calls
  157. namespace string
  158. // tags are global tags to be added to every statsd call
  159. tags []string
  160. flushTime time.Duration
  161. telemetry *statsdTelemetry
  162. telemetryClient *telemetryClient
  163. stop chan struct{}
  164. wg sync.WaitGroup
  165. workers []*worker
  166. closerLock sync.Mutex
  167. workersMode receivingMode
  168. aggregatorMode receivingMode
  169. agg *aggregator
  170. aggExtended *aggregator
  171. options []Option
  172. addrOption string
  173. }
  174. // statsdTelemetry contains telemetry metrics about the client
  175. type statsdTelemetry struct {
  176. totalMetricsGauge uint64
  177. totalMetricsCount uint64
  178. totalMetricsHistogram uint64
  179. totalMetricsDistribution uint64
  180. totalMetricsSet uint64
  181. totalMetricsTiming uint64
  182. totalEvents uint64
  183. totalServiceChecks uint64
  184. totalDroppedOnReceive uint64
  185. }
  186. // Verify that Client implements the ClientInterface.
  187. // https://golang.org/doc/faq#guarantee_satisfies_interface
  188. var _ ClientInterface = &Client{}
  189. func resolveAddr(addr string) string {
  190. envPort := ""
  191. if addr == "" {
  192. addr = os.Getenv(agentHostEnvVarName)
  193. envPort = os.Getenv(agentPortEnvVarName)
  194. }
  195. if addr == "" {
  196. return ""
  197. }
  198. if !strings.HasPrefix(addr, WindowsPipeAddressPrefix) && !strings.HasPrefix(addr, UnixAddressPrefix) {
  199. if !strings.Contains(addr, ":") {
  200. if envPort != "" {
  201. addr = fmt.Sprintf("%s:%s", addr, envPort)
  202. } else {
  203. addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort)
  204. }
  205. }
  206. }
  207. return addr
  208. }
  209. func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) {
  210. addr = resolveAddr(addr)
  211. if addr == "" {
  212. return nil, "", errors.New("No address passed and autodetection from environment failed")
  213. }
  214. switch {
  215. case strings.HasPrefix(addr, WindowsPipeAddressPrefix):
  216. w, err := newWindowsPipeWriter(addr, writeTimeout)
  217. return w, writerWindowsPipe, err
  218. case strings.HasPrefix(addr, UnixAddressPrefix):
  219. w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout)
  220. return w, writerNameUDS, err
  221. default:
  222. w, err := newUDPWriter(addr, writeTimeout)
  223. return w, writerNameUDP, err
  224. }
  225. }
  226. // New returns a pointer to a new Client given an addr in the format "hostname:port" for UDP,
  227. // "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes.
  228. func New(addr string, options ...Option) (*Client, error) {
  229. o, err := resolveOptions(options)
  230. if err != nil {
  231. return nil, err
  232. }
  233. w, writerType, err := createWriter(addr, o.writeTimeout)
  234. if err != nil {
  235. return nil, err
  236. }
  237. client, err := newWithWriter(w, o, writerType)
  238. if err == nil {
  239. client.options = append(client.options, options...)
  240. client.addrOption = addr
  241. }
  242. return client, err
  243. }
  244. // NewWithWriter creates a new Client with given writer. Writer is a
  245. // io.WriteCloser
  246. func NewWithWriter(w io.WriteCloser, options ...Option) (*Client, error) {
  247. o, err := resolveOptions(options)
  248. if err != nil {
  249. return nil, err
  250. }
  251. return newWithWriter(w, o, "custom")
  252. }
  253. // CloneWithExtraOptions create a new Client with extra options
  254. func CloneWithExtraOptions(c *Client, options ...Option) (*Client, error) {
  255. if c == nil {
  256. return nil, ErrNoClient
  257. }
  258. if c.addrOption == "" {
  259. return nil, fmt.Errorf("can't clone client with no addrOption")
  260. }
  261. opt := append(c.options, options...)
  262. return New(c.addrOption, opt...)
  263. }
  264. func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, error) {
  265. c := Client{
  266. namespace: o.namespace,
  267. tags: o.tags,
  268. telemetry: &statsdTelemetry{},
  269. }
  270. // Inject values of DD_* environment variables as global tags.
  271. for _, mapping := range ddEnvTagsMapping {
  272. if value := os.Getenv(mapping.envName); value != "" {
  273. c.tags = append(c.tags, fmt.Sprintf("%s:%s", mapping.tagName, value))
  274. }
  275. }
  276. if o.maxBytesPerPayload == 0 {
  277. if writerName == writerNameUDS {
  278. o.maxBytesPerPayload = DefaultMaxAgentPayloadSize
  279. } else {
  280. o.maxBytesPerPayload = OptimalUDPPayloadSize
  281. }
  282. }
  283. if o.bufferPoolSize == 0 {
  284. if writerName == writerNameUDS {
  285. o.bufferPoolSize = DefaultUDSBufferPoolSize
  286. } else {
  287. o.bufferPoolSize = DefaultUDPBufferPoolSize
  288. }
  289. }
  290. if o.senderQueueSize == 0 {
  291. if writerName == writerNameUDS {
  292. o.senderQueueSize = DefaultUDSBufferPoolSize
  293. } else {
  294. o.senderQueueSize = DefaultUDPBufferPoolSize
  295. }
  296. }
  297. bufferPool := newBufferPool(o.bufferPoolSize, o.maxBytesPerPayload, o.maxMessagesPerPayload)
  298. c.sender = newSender(w, o.senderQueueSize, bufferPool)
  299. c.aggregatorMode = o.receiveMode
  300. c.workersMode = o.receiveMode
  301. // channelMode mode at the worker level is not enabled when
  302. // ExtendedAggregation is since the user app will not directly
  303. // use the worker (the aggregator sit between the app and the
  304. // workers).
  305. if o.extendedAggregation {
  306. c.workersMode = mutexMode
  307. }
  308. if o.aggregation || o.extendedAggregation {
  309. c.agg = newAggregator(&c)
  310. c.agg.start(o.aggregationFlushInterval)
  311. if o.extendedAggregation {
  312. c.aggExtended = c.agg
  313. if c.aggregatorMode == channelMode {
  314. c.agg.startReceivingMetric(o.channelModeBufferSize, o.workersCount)
  315. }
  316. }
  317. }
  318. for i := 0; i < o.workersCount; i++ {
  319. w := newWorker(bufferPool, c.sender)
  320. c.workers = append(c.workers, w)
  321. if c.workersMode == channelMode {
  322. w.startReceivingMetric(o.channelModeBufferSize)
  323. }
  324. }
  325. c.flushTime = o.bufferFlushInterval
  326. c.stop = make(chan struct{}, 1)
  327. c.wg.Add(1)
  328. go func() {
  329. defer c.wg.Done()
  330. c.watch()
  331. }()
  332. if o.telemetry {
  333. if o.telemetryAddr == "" {
  334. c.telemetryClient = newTelemetryClient(&c, writerName, c.agg != nil)
  335. } else {
  336. var err error
  337. c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout)
  338. if err != nil {
  339. return nil, err
  340. }
  341. }
  342. c.telemetryClient.run(&c.wg, c.stop)
  343. }
  344. return &c, nil
  345. }
  346. func (c *Client) watch() {
  347. ticker := time.NewTicker(c.flushTime)
  348. for {
  349. select {
  350. case <-ticker.C:
  351. for _, w := range c.workers {
  352. w.flush()
  353. }
  354. case <-c.stop:
  355. ticker.Stop()
  356. return
  357. }
  358. }
  359. }
  360. // Flush forces a flush of all the queued dogstatsd payloads This method is
  361. // blocking and will not return until everything is sent through the network.
  362. // In mutexMode, this will also block sampling new data to the client while the
  363. // workers and sender are flushed.
  364. func (c *Client) Flush() error {
  365. if c == nil {
  366. return ErrNoClient
  367. }
  368. if c.agg != nil {
  369. c.agg.flush()
  370. }
  371. for _, w := range c.workers {
  372. w.pause()
  373. defer w.unpause()
  374. w.flushUnsafe()
  375. }
  376. // Now that the worker are pause the sender can flush the queue between
  377. // worker and senders
  378. c.sender.flush()
  379. return nil
  380. }
  381. func (c *Client) flushTelemetryMetrics(t *Telemetry) {
  382. t.TotalMetricsGauge = atomic.LoadUint64(&c.telemetry.totalMetricsGauge)
  383. t.TotalMetricsCount = atomic.LoadUint64(&c.telemetry.totalMetricsCount)
  384. t.TotalMetricsSet = atomic.LoadUint64(&c.telemetry.totalMetricsSet)
  385. t.TotalMetricsHistogram = atomic.LoadUint64(&c.telemetry.totalMetricsHistogram)
  386. t.TotalMetricsDistribution = atomic.LoadUint64(&c.telemetry.totalMetricsDistribution)
  387. t.TotalMetricsTiming = atomic.LoadUint64(&c.telemetry.totalMetricsTiming)
  388. t.TotalEvents = atomic.LoadUint64(&c.telemetry.totalEvents)
  389. t.TotalServiceChecks = atomic.LoadUint64(&c.telemetry.totalServiceChecks)
  390. t.TotalDroppedOnReceive = atomic.LoadUint64(&c.telemetry.totalDroppedOnReceive)
  391. }
  392. // GetTelemetry return the telemetry metrics for the client since it started.
  393. func (c *Client) GetTelemetry() Telemetry {
  394. return c.telemetryClient.getTelemetry()
  395. }
  396. func (c *Client) send(m metric) error {
  397. h := hashString32(m.name)
  398. worker := c.workers[h%uint32(len(c.workers))]
  399. if c.workersMode == channelMode {
  400. select {
  401. case worker.inputMetrics <- m:
  402. default:
  403. atomic.AddUint64(&c.telemetry.totalDroppedOnReceive, 1)
  404. }
  405. return nil
  406. }
  407. return worker.processMetric(m)
  408. }
  409. // sendBlocking is used by the aggregator to inject aggregated metrics.
  410. func (c *Client) sendBlocking(m metric) error {
  411. m.globalTags = c.tags
  412. m.namespace = c.namespace
  413. h := hashString32(m.name)
  414. worker := c.workers[h%uint32(len(c.workers))]
  415. return worker.processMetric(m)
  416. }
  417. func (c *Client) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc) error {
  418. if c.aggregatorMode == channelMode {
  419. select {
  420. case c.aggExtended.inputMetrics <- metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate}:
  421. default:
  422. atomic.AddUint64(&c.telemetry.totalDroppedOnReceive, 1)
  423. }
  424. return nil
  425. }
  426. return f(name, value, tags, rate)
  427. }
  428. // Gauge measures the value of a metric at a particular time.
  429. func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
  430. if c == nil {
  431. return ErrNoClient
  432. }
  433. atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1)
  434. if c.agg != nil {
  435. return c.agg.gauge(name, value, tags)
  436. }
  437. return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
  438. }
  439. // Count tracks how many times something happened per second.
  440. func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
  441. if c == nil {
  442. return ErrNoClient
  443. }
  444. atomic.AddUint64(&c.telemetry.totalMetricsCount, 1)
  445. if c.agg != nil {
  446. return c.agg.count(name, value, tags)
  447. }
  448. return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
  449. }
  450. // Histogram tracks the statistical distribution of a set of values on each host.
  451. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
  452. if c == nil {
  453. return ErrNoClient
  454. }
  455. atomic.AddUint64(&c.telemetry.totalMetricsHistogram, 1)
  456. if c.aggExtended != nil {
  457. return c.sendToAggregator(histogram, name, value, tags, rate, c.aggExtended.histogram)
  458. }
  459. return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
  460. }
  461. // Distribution tracks the statistical distribution of a set of values across your infrastructure.
  462. func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error {
  463. if c == nil {
  464. return ErrNoClient
  465. }
  466. atomic.AddUint64(&c.telemetry.totalMetricsDistribution, 1)
  467. if c.aggExtended != nil {
  468. return c.sendToAggregator(distribution, name, value, tags, rate, c.aggExtended.distribution)
  469. }
  470. return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
  471. }
  472. // Decr is just Count of -1
  473. func (c *Client) Decr(name string, tags []string, rate float64) error {
  474. return c.Count(name, -1, tags, rate)
  475. }
  476. // Incr is just Count of 1
  477. func (c *Client) Incr(name string, tags []string, rate float64) error {
  478. return c.Count(name, 1, tags, rate)
  479. }
  480. // Set counts the number of unique elements in a group.
  481. func (c *Client) Set(name string, value string, tags []string, rate float64) error {
  482. if c == nil {
  483. return ErrNoClient
  484. }
  485. atomic.AddUint64(&c.telemetry.totalMetricsSet, 1)
  486. if c.agg != nil {
  487. return c.agg.set(name, value, tags)
  488. }
  489. return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
  490. }
  491. // Timing sends timing information, it is an alias for TimeInMilliseconds
  492. func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error {
  493. return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate)
  494. }
  495. // TimeInMilliseconds sends timing information in milliseconds.
  496. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
  497. func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
  498. if c == nil {
  499. return ErrNoClient
  500. }
  501. atomic.AddUint64(&c.telemetry.totalMetricsTiming, 1)
  502. if c.aggExtended != nil {
  503. return c.sendToAggregator(timing, name, value, tags, rate, c.aggExtended.timing)
  504. }
  505. return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace})
  506. }
  507. // Event sends the provided Event.
  508. func (c *Client) Event(e *Event) error {
  509. if c == nil {
  510. return ErrNoClient
  511. }
  512. atomic.AddUint64(&c.telemetry.totalEvents, 1)
  513. return c.send(metric{metricType: event, evalue: e, rate: 1, globalTags: c.tags, namespace: c.namespace})
  514. }
  515. // SimpleEvent sends an event with the provided title and text.
  516. func (c *Client) SimpleEvent(title, text string) error {
  517. e := NewEvent(title, text)
  518. return c.Event(e)
  519. }
  520. // ServiceCheck sends the provided ServiceCheck.
  521. func (c *Client) ServiceCheck(sc *ServiceCheck) error {
  522. if c == nil {
  523. return ErrNoClient
  524. }
  525. atomic.AddUint64(&c.telemetry.totalServiceChecks, 1)
  526. return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1, globalTags: c.tags, namespace: c.namespace})
  527. }
  528. // SimpleServiceCheck sends an serviceCheck with the provided name and status.
  529. func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error {
  530. sc := NewServiceCheck(name, status)
  531. return c.ServiceCheck(sc)
  532. }
  533. // Close the client connection.
  534. func (c *Client) Close() error {
  535. if c == nil {
  536. return ErrNoClient
  537. }
  538. // Acquire closer lock to ensure only one thread can close the stop channel
  539. c.closerLock.Lock()
  540. defer c.closerLock.Unlock()
  541. // Notify all other threads that they should stop
  542. select {
  543. case <-c.stop:
  544. return nil
  545. default:
  546. }
  547. close(c.stop)
  548. if c.workersMode == channelMode {
  549. for _, w := range c.workers {
  550. w.stopReceivingMetric()
  551. }
  552. }
  553. // flush the aggregator first
  554. if c.agg != nil {
  555. if c.aggExtended != nil && c.aggregatorMode == channelMode {
  556. c.agg.stopReceivingMetric()
  557. }
  558. c.agg.stop()
  559. }
  560. // Wait for the threads to stop
  561. c.wg.Wait()
  562. c.Flush()
  563. return c.sender.close()
  564. }