| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- package statsd
- import (
- "math/rand"
- "sync"
- "time"
- )
- type worker struct {
- pool *bufferPool
- buffer *statsdBuffer
- sender *sender
- random *rand.Rand
- randomLock sync.Mutex
- sync.Mutex
- inputMetrics chan metric
- stop chan struct{}
- }
- func newWorker(pool *bufferPool, sender *sender) *worker {
- // Each worker uses its own random source and random lock to prevent
- // workers in separate goroutines from contending for the lock on the
- // "math/rand" package-global random source (e.g. calls like
- // "rand.Float64()" must acquire a shared lock to get the next
- // pseudorandom number).
- // Note that calling "time.Now().UnixNano()" repeatedly quickly may return
- // very similar values. That's fine for seeding the worker-specific random
- // source because we just need an evenly distributed stream of float values.
- // Do not use this random source for cryptographic randomness.
- random := rand.New(rand.NewSource(time.Now().UnixNano()))
- return &worker{
- pool: pool,
- sender: sender,
- buffer: pool.borrowBuffer(),
- random: random,
- stop: make(chan struct{}),
- }
- }
- func (w *worker) startReceivingMetric(bufferSize int) {
- w.inputMetrics = make(chan metric, bufferSize)
- go w.pullMetric()
- }
- func (w *worker) stopReceivingMetric() {
- w.stop <- struct{}{}
- }
- func (w *worker) pullMetric() {
- for {
- select {
- case m := <-w.inputMetrics:
- w.processMetric(m)
- case <-w.stop:
- return
- }
- }
- }
- func (w *worker) processMetric(m metric) error {
- if !shouldSample(m.rate, w.random, &w.randomLock) {
- return nil
- }
- w.Lock()
- var err error
- if err = w.writeMetricUnsafe(m); err == errBufferFull {
- w.flushUnsafe()
- err = w.writeMetricUnsafe(m)
- }
- w.Unlock()
- return err
- }
- func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, precision int) error {
- globalPos := 0
- // first check how much data we can write to the buffer:
- // +3 + len(metricSymbol) because the message will include '|<metricSymbol>|#' before the tags
- // +1 for the potential line break at the start of the metric
- tagsSize := len(m.stags) + 4 + len(metricSymbol)
- for _, t := range m.globalTags {
- tagsSize += len(t) + 1
- }
- for {
- pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, tagsSize, precision)
- if err == errPartialWrite {
- // We successfully wrote part of the histogram metrics.
- // We flush the current buffer and finish the histogram
- // in a new one.
- w.flushUnsafe()
- globalPos += pos
- } else {
- return err
- }
- }
- }
- func (w *worker) writeMetricUnsafe(m metric) error {
- switch m.metricType {
- case gauge:
- return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
- case count:
- return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
- case histogram:
- return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
- case distribution:
- return w.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
- case set:
- return w.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate)
- case timing:
- return w.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
- case event:
- return w.buffer.writeEvent(*m.evalue, m.globalTags)
- case serviceCheck:
- return w.buffer.writeServiceCheck(*m.scvalue, m.globalTags)
- case histogramAggregated:
- return w.writeAggregatedMetricUnsafe(m, histogramSymbol, -1)
- case distributionAggregated:
- return w.writeAggregatedMetricUnsafe(m, distributionSymbol, -1)
- case timingAggregated:
- return w.writeAggregatedMetricUnsafe(m, timingSymbol, 6)
- default:
- return nil
- }
- }
- func (w *worker) flush() {
- w.Lock()
- w.flushUnsafe()
- w.Unlock()
- }
- func (w *worker) pause() {
- w.Lock()
- }
- func (w *worker) unpause() {
- w.Unlock()
- }
- // flush the current buffer. Lock must be held by caller.
- // flushed buffer written to the network asynchronously.
- func (w *worker) flushUnsafe() {
- if len(w.buffer.bytes()) > 0 {
- w.sender.send(w.buffer)
- w.buffer = w.pool.borrowBuffer()
- }
- }
|