statsd.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. // Copyright 2013 Ooyala, Inc.
  2. /*
  3. Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd,
  4. adding tags and histograms and pushing upstream to Datadog.
  5. Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD.
  6. statsd is based on go-statsd-client.
  7. */
  8. package statsd
  9. import (
  10. "errors"
  11. "fmt"
  12. "os"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. )
  18. /*
  19. OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes
  20. is optimal for regular networks with an MTU of 1500 so datagrams don't get
  21. fragmented. It's generally recommended not to fragment UDP datagrams as losing
  22. a single fragment will cause the entire datagram to be lost.
  23. */
  24. const OptimalUDPPayloadSize = 1432
  25. /*
  26. MaxUDPPayloadSize defines the maximum payload size for a UDP datagram.
  27. Its value comes from the calculation: 65535 bytes Max UDP datagram size -
  28. 8byte UDP header - 60byte max IP headers
  29. any number greater than that will see frames being cut out.
  30. */
  31. const MaxUDPPayloadSize = 65467
  32. // DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients.
  33. const DefaultUDPBufferPoolSize = 2048
  34. // DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients.
  35. const DefaultUDSBufferPoolSize = 512
  36. /*
  37. DefaultMaxAgentPayloadSize is the default maximum payload size the agent
  38. can receive. This can be adjusted by changing dogstatsd_buffer_size in the
  39. agent configuration file datadog.yaml. This is also used as the optimal payload size
  40. for UDS datagrams.
  41. */
  42. const DefaultMaxAgentPayloadSize = 8192
  43. /*
  44. UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket
  45. traffic instead of UDP.
  46. */
  47. const UnixAddressPrefix = "unix://"
  48. /*
  49. WindowsPipeAddressPrefix holds the prefix to use to enable Windows Named Pipes
  50. traffic instead of UDP.
  51. */
  52. const WindowsPipeAddressPrefix = `\\.\pipe\`
  53. const (
  54. agentHostEnvVarName = "DD_AGENT_HOST"
  55. agentPortEnvVarName = "DD_DOGSTATSD_PORT"
  56. defaultUDPPort = "8125"
  57. )
  58. /*
  59. ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
  60. to a specific tag name. We use a slice to keep the order and simplify tests.
  61. */
  62. var ddEnvTagsMapping = []struct{ envName, tagName string }{
  63. {"DD_ENTITY_ID", "dd.internal.entity_id"}, // Client-side entity ID injection for container tagging.
  64. {"DD_ENV", "env"}, // The name of the env in which the service runs.
  65. {"DD_SERVICE", "service"}, // The name of the running service.
  66. {"DD_VERSION", "version"}, // The current version of the running service.
  67. }
  68. type metricType int
  69. const (
  70. gauge metricType = iota
  71. count
  72. histogram
  73. histogramAggregated
  74. distribution
  75. distributionAggregated
  76. set
  77. timing
  78. timingAggregated
  79. event
  80. serviceCheck
  81. )
  82. type ReceivingMode int
  83. const (
  84. MutexMode ReceivingMode = iota
  85. ChannelMode
  86. )
  87. const (
  88. WriterNameUDP string = "udp"
  89. WriterNameUDS string = "uds"
  90. WriterWindowsPipe string = "pipe"
  91. )
  92. type metric struct {
  93. metricType metricType
  94. namespace string
  95. globalTags []string
  96. name string
  97. fvalue float64
  98. fvalues []float64
  99. ivalue int64
  100. svalue string
  101. evalue *Event
  102. scvalue *ServiceCheck
  103. tags []string
  104. stags string
  105. rate float64
  106. }
  107. type noClientErr string
  108. // ErrNoClient is returned if statsd reporting methods are invoked on
  109. // a nil client.
  110. const ErrNoClient = noClientErr("statsd client is nil")
  111. func (e noClientErr) Error() string {
  112. return string(e)
  113. }
  114. // ClientInterface is an interface that exposes the common client functions for the
  115. // purpose of being able to provide a no-op client or even mocking. This can aid
  116. // downstream users' with their testing.
  117. type ClientInterface interface {
  118. // Gauge measures the value of a metric at a particular time.
  119. Gauge(name string, value float64, tags []string, rate float64) error
  120. // Count tracks how many times something happened per second.
  121. Count(name string, value int64, tags []string, rate float64) error
  122. // Histogram tracks the statistical distribution of a set of values on each host.
  123. Histogram(name string, value float64, tags []string, rate float64) error
  124. // Distribution tracks the statistical distribution of a set of values across your infrastructure.
  125. Distribution(name string, value float64, tags []string, rate float64) error
  126. // Decr is just Count of -1
  127. Decr(name string, tags []string, rate float64) error
  128. // Incr is just Count of 1
  129. Incr(name string, tags []string, rate float64) error
  130. // Set counts the number of unique elements in a group.
  131. Set(name string, value string, tags []string, rate float64) error
  132. // Timing sends timing information, it is an alias for TimeInMilliseconds
  133. Timing(name string, value time.Duration, tags []string, rate float64) error
  134. // TimeInMilliseconds sends timing information in milliseconds.
  135. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
  136. TimeInMilliseconds(name string, value float64, tags []string, rate float64) error
  137. // Event sends the provided Event.
  138. Event(e *Event) error
  139. // SimpleEvent sends an event with the provided title and text.
  140. SimpleEvent(title, text string) error
  141. // ServiceCheck sends the provided ServiceCheck.
  142. ServiceCheck(sc *ServiceCheck) error
  143. // SimpleServiceCheck sends an serviceCheck with the provided name and status.
  144. SimpleServiceCheck(name string, status ServiceCheckStatus) error
  145. // Close the client connection.
  146. Close() error
  147. // Flush forces a flush of all the queued dogstatsd payloads.
  148. Flush() error
  149. // SetWriteTimeout allows the user to set a custom write timeout.
  150. SetWriteTimeout(d time.Duration) error
  151. }
  152. // A Client is a handle for sending messages to dogstatsd. It is safe to
  153. // use one Client from multiple goroutines simultaneously.
  154. type Client struct {
  155. // Sender handles the underlying networking protocol
  156. sender *sender
  157. // Namespace to prepend to all statsd calls
  158. Namespace string
  159. // Tags are global tags to be added to every statsd call
  160. Tags []string
  161. // skipErrors turns off error passing and allows UDS to emulate UDP behaviour
  162. SkipErrors bool
  163. flushTime time.Duration
  164. metrics *ClientMetrics
  165. telemetry *telemetryClient
  166. stop chan struct{}
  167. wg sync.WaitGroup
  168. workers []*worker
  169. closerLock sync.Mutex
  170. workersMode ReceivingMode
  171. aggregatorMode ReceivingMode
  172. agg *aggregator
  173. aggExtended *aggregator
  174. options []Option
  175. addrOption string
  176. }
  177. // ClientMetrics contains metrics about the client
  178. type ClientMetrics struct {
  179. TotalMetrics uint64
  180. TotalMetricsGauge uint64
  181. TotalMetricsCount uint64
  182. TotalMetricsHistogram uint64
  183. TotalMetricsDistribution uint64
  184. TotalMetricsSet uint64
  185. TotalMetricsTiming uint64
  186. TotalEvents uint64
  187. TotalServiceChecks uint64
  188. TotalDroppedOnReceive uint64
  189. }
  190. // Verify that Client implements the ClientInterface.
  191. // https://golang.org/doc/faq#guarantee_satisfies_interface
  192. var _ ClientInterface = &Client{}
  193. func resolveAddr(addr string) string {
  194. envPort := ""
  195. if addr == "" {
  196. addr = os.Getenv(agentHostEnvVarName)
  197. envPort = os.Getenv(agentPortEnvVarName)
  198. }
  199. if addr == "" {
  200. return ""
  201. }
  202. if !strings.HasPrefix(addr, WindowsPipeAddressPrefix) && !strings.HasPrefix(addr, UnixAddressPrefix) {
  203. if !strings.Contains(addr, ":") {
  204. if envPort != "" {
  205. addr = fmt.Sprintf("%s:%s", addr, envPort)
  206. } else {
  207. addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort)
  208. }
  209. }
  210. }
  211. return addr
  212. }
  213. func createWriter(addr string) (statsdWriter, string, error) {
  214. addr = resolveAddr(addr)
  215. if addr == "" {
  216. return nil, "", errors.New("No address passed and autodetection from environment failed")
  217. }
  218. switch {
  219. case strings.HasPrefix(addr, WindowsPipeAddressPrefix):
  220. w, err := newWindowsPipeWriter(addr)
  221. return w, WriterWindowsPipe, err
  222. case strings.HasPrefix(addr, UnixAddressPrefix):
  223. w, err := newUDSWriter(addr[len(UnixAddressPrefix):])
  224. return w, WriterNameUDS, err
  225. default:
  226. w, err := newUDPWriter(addr)
  227. return w, WriterNameUDP, err
  228. }
  229. }
  230. // New returns a pointer to a new Client given an addr in the format "hostname:port" for UDP,
  231. // "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes.
  232. func New(addr string, options ...Option) (*Client, error) {
  233. o, err := resolveOptions(options)
  234. if err != nil {
  235. return nil, err
  236. }
  237. w, writerType, err := createWriter(addr)
  238. if err != nil {
  239. return nil, err
  240. }
  241. client, err := newWithWriter(w, o, writerType)
  242. if err == nil {
  243. client.options = append(client.options, options...)
  244. client.addrOption = addr
  245. }
  246. return client, err
  247. }
  248. // NewWithWriter creates a new Client with given writer. Writer is a
  249. // io.WriteCloser + SetWriteTimeout(time.Duration) error
  250. func NewWithWriter(w statsdWriter, options ...Option) (*Client, error) {
  251. o, err := resolveOptions(options)
  252. if err != nil {
  253. return nil, err
  254. }
  255. return newWithWriter(w, o, "custom")
  256. }
  257. // CloneWithExtraOptions create a new Client with extra options
  258. func CloneWithExtraOptions(c *Client, options ...Option) (*Client, error) {
  259. if c == nil {
  260. return nil, ErrNoClient
  261. }
  262. if c.addrOption == "" {
  263. return nil, fmt.Errorf("can't clone client with no addrOption")
  264. }
  265. opt := append(c.options, options...)
  266. return New(c.addrOption, opt...)
  267. }
  268. func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, error) {
  269. w.SetWriteTimeout(o.WriteTimeoutUDS)
  270. c := Client{
  271. Namespace: o.Namespace,
  272. Tags: o.Tags,
  273. metrics: &ClientMetrics{},
  274. }
  275. // Inject values of DD_* environment variables as global tags.
  276. for _, mapping := range ddEnvTagsMapping {
  277. if value := os.Getenv(mapping.envName); value != "" {
  278. c.Tags = append(c.Tags, fmt.Sprintf("%s:%s", mapping.tagName, value))
  279. }
  280. }
  281. if o.MaxBytesPerPayload == 0 {
  282. if writerName == WriterNameUDS {
  283. o.MaxBytesPerPayload = DefaultMaxAgentPayloadSize
  284. } else {
  285. o.MaxBytesPerPayload = OptimalUDPPayloadSize
  286. }
  287. }
  288. if o.BufferPoolSize == 0 {
  289. if writerName == WriterNameUDS {
  290. o.BufferPoolSize = DefaultUDSBufferPoolSize
  291. } else {
  292. o.BufferPoolSize = DefaultUDPBufferPoolSize
  293. }
  294. }
  295. if o.SenderQueueSize == 0 {
  296. if writerName == WriterNameUDS {
  297. o.SenderQueueSize = DefaultUDSBufferPoolSize
  298. } else {
  299. o.SenderQueueSize = DefaultUDPBufferPoolSize
  300. }
  301. }
  302. bufferPool := newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
  303. c.sender = newSender(w, o.SenderQueueSize, bufferPool)
  304. c.aggregatorMode = o.ReceiveMode
  305. c.workersMode = o.ReceiveMode
  306. // ChannelMode mode at the worker level is not enabled when
  307. // ExtendedAggregation is since the user app will not directly
  308. // use the worker (the aggregator sit between the app and the
  309. // workers).
  310. if o.ExtendedAggregation {
  311. c.workersMode = MutexMode
  312. }
  313. if o.Aggregation || o.ExtendedAggregation {
  314. c.agg = newAggregator(&c)
  315. c.agg.start(o.AggregationFlushInterval)
  316. if o.ExtendedAggregation {
  317. c.aggExtended = c.agg
  318. if c.aggregatorMode == ChannelMode {
  319. c.agg.startReceivingMetric(o.ChannelModeBufferSize, o.BufferShardCount)
  320. }
  321. }
  322. }
  323. for i := 0; i < o.BufferShardCount; i++ {
  324. w := newWorker(bufferPool, c.sender)
  325. c.workers = append(c.workers, w)
  326. if c.workersMode == ChannelMode {
  327. w.startReceivingMetric(o.ChannelModeBufferSize)
  328. }
  329. }
  330. c.flushTime = o.BufferFlushInterval
  331. c.stop = make(chan struct{}, 1)
  332. c.wg.Add(1)
  333. go func() {
  334. defer c.wg.Done()
  335. c.watch()
  336. }()
  337. if o.Telemetry {
  338. if o.TelemetryAddr == "" {
  339. c.telemetry = newTelemetryClient(&c, writerName, o.DevMode)
  340. } else {
  341. var err error
  342. c.telemetry, err = newTelemetryClientWithCustomAddr(&c, writerName, o.DevMode, o.TelemetryAddr, bufferPool)
  343. if err != nil {
  344. return nil, err
  345. }
  346. }
  347. c.telemetry.run(&c.wg, c.stop)
  348. }
  349. return &c, nil
  350. }
  351. // NewBuffered returns a Client that buffers its output and sends it in chunks.
  352. // Buflen is the length of the buffer in number of commands.
  353. //
  354. // When addr is empty, the client will default to a UDP client and use the DD_AGENT_HOST
  355. // and (optionally) the DD_DOGSTATSD_PORT environment variables to build the target address.
  356. func NewBuffered(addr string, buflen int) (*Client, error) {
  357. return New(addr, WithMaxMessagesPerPayload(buflen))
  358. }
  359. // SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP
  360. // or Windows Pipes.
  361. func (c *Client) SetWriteTimeout(d time.Duration) error {
  362. if c == nil {
  363. return ErrNoClient
  364. }
  365. return c.sender.transport.SetWriteTimeout(d)
  366. }
  367. func (c *Client) watch() {
  368. ticker := time.NewTicker(c.flushTime)
  369. for {
  370. select {
  371. case <-ticker.C:
  372. for _, w := range c.workers {
  373. w.flush()
  374. }
  375. case <-c.stop:
  376. ticker.Stop()
  377. return
  378. }
  379. }
  380. }
  381. // Flush forces a flush of all the queued dogstatsd payloads This method is
  382. // blocking and will not return until everything is sent through the network.
  383. // In MutexMode, this will also block sampling new data to the client while the
  384. // workers and sender are flushed.
  385. func (c *Client) Flush() error {
  386. if c == nil {
  387. return ErrNoClient
  388. }
  389. if c.agg != nil {
  390. c.agg.flush()
  391. }
  392. for _, w := range c.workers {
  393. w.pause()
  394. defer w.unpause()
  395. w.flushUnsafe()
  396. }
  397. // Now that the worker are pause the sender can flush the queue between
  398. // worker and senders
  399. c.sender.flush()
  400. return nil
  401. }
  402. func (c *Client) FlushTelemetryMetrics() ClientMetrics {
  403. cm := ClientMetrics{
  404. TotalMetricsGauge: atomic.SwapUint64(&c.metrics.TotalMetricsGauge, 0),
  405. TotalMetricsCount: atomic.SwapUint64(&c.metrics.TotalMetricsCount, 0),
  406. TotalMetricsSet: atomic.SwapUint64(&c.metrics.TotalMetricsSet, 0),
  407. TotalMetricsHistogram: atomic.SwapUint64(&c.metrics.TotalMetricsHistogram, 0),
  408. TotalMetricsDistribution: atomic.SwapUint64(&c.metrics.TotalMetricsDistribution, 0),
  409. TotalMetricsTiming: atomic.SwapUint64(&c.metrics.TotalMetricsTiming, 0),
  410. TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0),
  411. TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0),
  412. TotalDroppedOnReceive: atomic.SwapUint64(&c.metrics.TotalDroppedOnReceive, 0),
  413. }
  414. cm.TotalMetrics = cm.TotalMetricsGauge + cm.TotalMetricsCount +
  415. cm.TotalMetricsSet + cm.TotalMetricsHistogram +
  416. cm.TotalMetricsDistribution + cm.TotalMetricsTiming
  417. return cm
  418. }
  419. func (c *Client) send(m metric) error {
  420. h := hashString32(m.name)
  421. worker := c.workers[h%uint32(len(c.workers))]
  422. if c.workersMode == ChannelMode {
  423. select {
  424. case worker.inputMetrics <- m:
  425. default:
  426. atomic.AddUint64(&c.metrics.TotalDroppedOnReceive, 1)
  427. }
  428. return nil
  429. }
  430. return worker.processMetric(m)
  431. }
  432. // sendBlocking is used by the aggregator to inject aggregated metrics.
  433. func (c *Client) sendBlocking(m metric) error {
  434. m.globalTags = c.Tags
  435. m.namespace = c.Namespace
  436. h := hashString32(m.name)
  437. worker := c.workers[h%uint32(len(c.workers))]
  438. return worker.processMetric(m)
  439. }
  440. func (c *Client) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc) error {
  441. if c.aggregatorMode == ChannelMode {
  442. select {
  443. case c.aggExtended.inputMetrics <- metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate}:
  444. default:
  445. atomic.AddUint64(&c.metrics.TotalDroppedOnReceive, 1)
  446. }
  447. return nil
  448. }
  449. return f(name, value, tags, rate)
  450. }
  451. // Gauge measures the value of a metric at a particular time.
  452. func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
  453. if c == nil {
  454. return ErrNoClient
  455. }
  456. atomic.AddUint64(&c.metrics.TotalMetricsGauge, 1)
  457. if c.agg != nil {
  458. return c.agg.gauge(name, value, tags)
  459. }
  460. return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.Tags, namespace: c.Namespace})
  461. }
  462. // Count tracks how many times something happened per second.
  463. func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
  464. if c == nil {
  465. return ErrNoClient
  466. }
  467. atomic.AddUint64(&c.metrics.TotalMetricsCount, 1)
  468. if c.agg != nil {
  469. return c.agg.count(name, value, tags)
  470. }
  471. return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.Tags, namespace: c.Namespace})
  472. }
  473. // Histogram tracks the statistical distribution of a set of values on each host.
  474. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
  475. if c == nil {
  476. return ErrNoClient
  477. }
  478. atomic.AddUint64(&c.metrics.TotalMetricsHistogram, 1)
  479. if c.aggExtended != nil {
  480. return c.sendToAggregator(histogram, name, value, tags, rate, c.aggExtended.histogram)
  481. }
  482. return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.Tags, namespace: c.Namespace})
  483. }
  484. // Distribution tracks the statistical distribution of a set of values across your infrastructure.
  485. func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error {
  486. if c == nil {
  487. return ErrNoClient
  488. }
  489. atomic.AddUint64(&c.metrics.TotalMetricsDistribution, 1)
  490. if c.aggExtended != nil {
  491. return c.sendToAggregator(distribution, name, value, tags, rate, c.aggExtended.distribution)
  492. }
  493. return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.Tags, namespace: c.Namespace})
  494. }
  495. // Decr is just Count of -1
  496. func (c *Client) Decr(name string, tags []string, rate float64) error {
  497. return c.Count(name, -1, tags, rate)
  498. }
  499. // Incr is just Count of 1
  500. func (c *Client) Incr(name string, tags []string, rate float64) error {
  501. return c.Count(name, 1, tags, rate)
  502. }
  503. // Set counts the number of unique elements in a group.
  504. func (c *Client) Set(name string, value string, tags []string, rate float64) error {
  505. if c == nil {
  506. return ErrNoClient
  507. }
  508. atomic.AddUint64(&c.metrics.TotalMetricsSet, 1)
  509. if c.agg != nil {
  510. return c.agg.set(name, value, tags)
  511. }
  512. return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate, globalTags: c.Tags, namespace: c.Namespace})
  513. }
  514. // Timing sends timing information, it is an alias for TimeInMilliseconds
  515. func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error {
  516. return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate)
  517. }
  518. // TimeInMilliseconds sends timing information in milliseconds.
  519. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
  520. func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
  521. if c == nil {
  522. return ErrNoClient
  523. }
  524. atomic.AddUint64(&c.metrics.TotalMetricsTiming, 1)
  525. if c.aggExtended != nil {
  526. return c.sendToAggregator(timing, name, value, tags, rate, c.aggExtended.timing)
  527. }
  528. return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.Tags, namespace: c.Namespace})
  529. }
  530. // Event sends the provided Event.
  531. func (c *Client) Event(e *Event) error {
  532. if c == nil {
  533. return ErrNoClient
  534. }
  535. atomic.AddUint64(&c.metrics.TotalEvents, 1)
  536. return c.send(metric{metricType: event, evalue: e, rate: 1, globalTags: c.Tags, namespace: c.Namespace})
  537. }
  538. // SimpleEvent sends an event with the provided title and text.
  539. func (c *Client) SimpleEvent(title, text string) error {
  540. e := NewEvent(title, text)
  541. return c.Event(e)
  542. }
  543. // ServiceCheck sends the provided ServiceCheck.
  544. func (c *Client) ServiceCheck(sc *ServiceCheck) error {
  545. if c == nil {
  546. return ErrNoClient
  547. }
  548. atomic.AddUint64(&c.metrics.TotalServiceChecks, 1)
  549. return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1, globalTags: c.Tags, namespace: c.Namespace})
  550. }
  551. // SimpleServiceCheck sends an serviceCheck with the provided name and status.
  552. func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error {
  553. sc := NewServiceCheck(name, status)
  554. return c.ServiceCheck(sc)
  555. }
  556. // Close the client connection.
  557. func (c *Client) Close() error {
  558. if c == nil {
  559. return ErrNoClient
  560. }
  561. // Acquire closer lock to ensure only one thread can close the stop channel
  562. c.closerLock.Lock()
  563. defer c.closerLock.Unlock()
  564. // Notify all other threads that they should stop
  565. select {
  566. case <-c.stop:
  567. return nil
  568. default:
  569. }
  570. close(c.stop)
  571. if c.workersMode == ChannelMode {
  572. for _, w := range c.workers {
  573. w.stopReceivingMetric()
  574. }
  575. }
  576. // flush the aggregator first
  577. if c.agg != nil {
  578. if c.aggExtended != nil && c.aggregatorMode == ChannelMode {
  579. c.agg.stopReceivingMetric()
  580. }
  581. c.agg.stop()
  582. }
  583. // Wait for the threads to stop
  584. c.wg.Wait()
  585. c.Flush()
  586. return c.sender.close()
  587. }