buffer.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package statsd
  2. import (
  3. "strconv"
  4. )
  5. type bufferFullError string
  6. func (e bufferFullError) Error() string { return string(e) }
  7. const errBufferFull = bufferFullError("statsd buffer is full")
  8. type partialWriteError string
  9. func (e partialWriteError) Error() string { return string(e) }
  10. const errPartialWrite = partialWriteError("value partially written")
  11. const metricOverhead = 512
  12. // statsdBuffer is a buffer containing statsd messages
  13. // this struct methods are NOT safe for concurent use
  14. type statsdBuffer struct {
  15. buffer []byte
  16. maxSize int
  17. maxElements int
  18. elementCount int
  19. }
  20. func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer {
  21. return &statsdBuffer{
  22. buffer: make([]byte, 0, maxSize+metricOverhead), // pre-allocate the needed size + metricOverhead to avoid having Go re-allocate on it's own if an element does not fit
  23. maxSize: maxSize,
  24. maxElements: maxElements,
  25. }
  26. }
  27. func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
  28. if b.elementCount >= b.maxElements {
  29. return errBufferFull
  30. }
  31. originalBuffer := b.buffer
  32. b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate)
  33. b.writeSeparator()
  34. return b.validateNewElement(originalBuffer)
  35. }
  36. func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error {
  37. if b.elementCount >= b.maxElements {
  38. return errBufferFull
  39. }
  40. originalBuffer := b.buffer
  41. b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate)
  42. b.writeSeparator()
  43. return b.validateNewElement(originalBuffer)
  44. }
  45. func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
  46. if b.elementCount >= b.maxElements {
  47. return errBufferFull
  48. }
  49. originalBuffer := b.buffer
  50. b.buffer = appendHistogram(b.buffer, namespace, globalTags, name, value, tags, rate)
  51. b.writeSeparator()
  52. return b.validateNewElement(originalBuffer)
  53. }
  54. // writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped.
  55. func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int) (int, error) {
  56. if b.elementCount >= b.maxElements {
  57. return 0, errBufferFull
  58. }
  59. originalBuffer := b.buffer
  60. b.buffer = appendHeader(b.buffer, namespace, name)
  61. // buffer already full
  62. if len(b.buffer)+tagSize > b.maxSize {
  63. b.buffer = originalBuffer
  64. return 0, errBufferFull
  65. }
  66. // We add as many value as possible
  67. var position int
  68. for idx, v := range values {
  69. previousBuffer := b.buffer
  70. if idx != 0 {
  71. b.buffer = append(b.buffer, ':')
  72. }
  73. b.buffer = strconv.AppendFloat(b.buffer, v, 'f', precision, 64)
  74. // Should we stop serializing and switch to another buffer
  75. if len(b.buffer)+tagSize > b.maxSize {
  76. b.buffer = previousBuffer
  77. break
  78. }
  79. position = idx + 1
  80. }
  81. // we could not add a single value
  82. if position == 0 {
  83. b.buffer = originalBuffer
  84. return 0, errBufferFull
  85. }
  86. b.buffer = append(b.buffer, '|')
  87. b.buffer = append(b.buffer, metricSymbol...)
  88. b.buffer = appendTagsAggregated(b.buffer, globalTags, tags)
  89. b.writeSeparator()
  90. b.elementCount++
  91. if position != len(values) {
  92. return position, errPartialWrite
  93. }
  94. return position, nil
  95. }
  96. func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
  97. if b.elementCount >= b.maxElements {
  98. return errBufferFull
  99. }
  100. originalBuffer := b.buffer
  101. b.buffer = appendDistribution(b.buffer, namespace, globalTags, name, value, tags, rate)
  102. b.writeSeparator()
  103. return b.validateNewElement(originalBuffer)
  104. }
  105. func (b *statsdBuffer) writeSet(namespace string, globalTags []string, name string, value string, tags []string, rate float64) error {
  106. if b.elementCount >= b.maxElements {
  107. return errBufferFull
  108. }
  109. originalBuffer := b.buffer
  110. b.buffer = appendSet(b.buffer, namespace, globalTags, name, value, tags, rate)
  111. b.writeSeparator()
  112. return b.validateNewElement(originalBuffer)
  113. }
  114. func (b *statsdBuffer) writeTiming(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
  115. if b.elementCount >= b.maxElements {
  116. return errBufferFull
  117. }
  118. originalBuffer := b.buffer
  119. b.buffer = appendTiming(b.buffer, namespace, globalTags, name, value, tags, rate)
  120. b.writeSeparator()
  121. return b.validateNewElement(originalBuffer)
  122. }
  123. func (b *statsdBuffer) writeEvent(event Event, globalTags []string) error {
  124. if b.elementCount >= b.maxElements {
  125. return errBufferFull
  126. }
  127. originalBuffer := b.buffer
  128. b.buffer = appendEvent(b.buffer, event, globalTags)
  129. b.writeSeparator()
  130. return b.validateNewElement(originalBuffer)
  131. }
  132. func (b *statsdBuffer) writeServiceCheck(serviceCheck ServiceCheck, globalTags []string) error {
  133. if b.elementCount >= b.maxElements {
  134. return errBufferFull
  135. }
  136. originalBuffer := b.buffer
  137. b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags)
  138. b.writeSeparator()
  139. return b.validateNewElement(originalBuffer)
  140. }
  141. func (b *statsdBuffer) validateNewElement(originalBuffer []byte) error {
  142. if len(b.buffer) > b.maxSize {
  143. b.buffer = originalBuffer
  144. return errBufferFull
  145. }
  146. b.elementCount++
  147. return nil
  148. }
  149. func (b *statsdBuffer) writeSeparator() {
  150. b.buffer = append(b.buffer, '\n')
  151. }
  152. func (b *statsdBuffer) reset() {
  153. b.buffer = b.buffer[:0]
  154. b.elementCount = 0
  155. }
  156. func (b *statsdBuffer) bytes() []byte {
  157. return b.buffer
  158. }