aggregation_data.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package view
  16. import (
  17. "math"
  18. "time"
  19. "go.opencensus.io/metric/metricdata"
  20. )
  21. // AggregationData represents an aggregated value from a collection.
  22. // They are reported on the view data during exporting.
  23. // Mosts users won't directly access aggregration data.
  24. type AggregationData interface {
  25. isAggregationData() bool
  26. addSample(v float64, attachments map[string]interface{}, t time.Time)
  27. clone() AggregationData
  28. equal(other AggregationData) bool
  29. toPoint(t metricdata.Type, time time.Time) metricdata.Point
  30. StartTime() time.Time
  31. }
  32. const epsilon = 1e-9
  33. // CountData is the aggregated data for the Count aggregation.
  34. // A count aggregation processes data and counts the recordings.
  35. //
  36. // Most users won't directly access count data.
  37. type CountData struct {
  38. Start time.Time
  39. Value int64
  40. }
  41. func (a *CountData) isAggregationData() bool { return true }
  42. func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
  43. a.Value = a.Value + 1
  44. }
  45. func (a *CountData) clone() AggregationData {
  46. return &CountData{Value: a.Value, Start: a.Start}
  47. }
  48. func (a *CountData) equal(other AggregationData) bool {
  49. a2, ok := other.(*CountData)
  50. if !ok {
  51. return false
  52. }
  53. return a.Start.Equal(a2.Start) && a.Value == a2.Value
  54. }
  55. func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  56. switch metricType {
  57. case metricdata.TypeCumulativeInt64:
  58. return metricdata.NewInt64Point(t, a.Value)
  59. default:
  60. panic("unsupported metricdata.Type")
  61. }
  62. }
  63. // StartTime returns the start time of the data being aggregated by CountData.
  64. func (a *CountData) StartTime() time.Time {
  65. return a.Start
  66. }
  67. // SumData is the aggregated data for the Sum aggregation.
  68. // A sum aggregation processes data and sums up the recordings.
  69. //
  70. // Most users won't directly access sum data.
  71. type SumData struct {
  72. Start time.Time
  73. Value float64
  74. }
  75. func (a *SumData) isAggregationData() bool { return true }
  76. func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
  77. a.Value += v
  78. }
  79. func (a *SumData) clone() AggregationData {
  80. return &SumData{Value: a.Value, Start: a.Start}
  81. }
  82. func (a *SumData) equal(other AggregationData) bool {
  83. a2, ok := other.(*SumData)
  84. if !ok {
  85. return false
  86. }
  87. return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon
  88. }
  89. func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  90. switch metricType {
  91. case metricdata.TypeCumulativeInt64:
  92. return metricdata.NewInt64Point(t, int64(a.Value))
  93. case metricdata.TypeCumulativeFloat64:
  94. return metricdata.NewFloat64Point(t, a.Value)
  95. default:
  96. panic("unsupported metricdata.Type")
  97. }
  98. }
  99. // StartTime returns the start time of the data being aggregated by SumData.
  100. func (a *SumData) StartTime() time.Time {
  101. return a.Start
  102. }
  103. // DistributionData is the aggregated data for the
  104. // Distribution aggregation.
  105. //
  106. // Most users won't directly access distribution data.
  107. //
  108. // For a distribution with N bounds, the associated DistributionData will have
  109. // N+1 buckets.
  110. type DistributionData struct {
  111. Count int64 // number of data points aggregated
  112. Min float64 // minimum value in the distribution
  113. Max float64 // max value in the distribution
  114. Mean float64 // mean of the distribution
  115. SumOfSquaredDev float64 // sum of the squared deviation from the mean
  116. CountPerBucket []int64 // number of occurrences per bucket
  117. // ExemplarsPerBucket is slice the same length as CountPerBucket containing
  118. // an exemplar for the associated bucket, or nil.
  119. ExemplarsPerBucket []*metricdata.Exemplar
  120. bounds []float64 // histogram distribution of the values
  121. Start time.Time
  122. }
  123. func newDistributionData(agg *Aggregation, t time.Time) *DistributionData {
  124. bucketCount := len(agg.Buckets) + 1
  125. return &DistributionData{
  126. CountPerBucket: make([]int64, bucketCount),
  127. ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
  128. bounds: agg.Buckets,
  129. Min: math.MaxFloat64,
  130. Max: math.SmallestNonzeroFloat64,
  131. Start: t,
  132. }
  133. }
  134. // Sum returns the sum of all samples collected.
  135. func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) }
  136. func (a *DistributionData) variance() float64 {
  137. if a.Count <= 1 {
  138. return 0
  139. }
  140. return a.SumOfSquaredDev / float64(a.Count-1)
  141. }
  142. func (a *DistributionData) isAggregationData() bool { return true }
  143. // TODO(songy23): support exemplar attachments.
  144. func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
  145. if v < a.Min {
  146. a.Min = v
  147. }
  148. if v > a.Max {
  149. a.Max = v
  150. }
  151. a.Count++
  152. a.addToBucket(v, attachments, t)
  153. if a.Count == 1 {
  154. a.Mean = v
  155. return
  156. }
  157. oldMean := a.Mean
  158. a.Mean = a.Mean + (v-a.Mean)/float64(a.Count)
  159. a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
  160. }
  161. func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
  162. var count *int64
  163. var i int
  164. var b float64
  165. for i, b = range a.bounds {
  166. if v < b {
  167. count = &a.CountPerBucket[i]
  168. break
  169. }
  170. }
  171. if count == nil { // Last bucket.
  172. i = len(a.bounds)
  173. count = &a.CountPerBucket[i]
  174. }
  175. *count++
  176. if exemplar := getExemplar(v, attachments, t); exemplar != nil {
  177. a.ExemplarsPerBucket[i] = exemplar
  178. }
  179. }
  180. func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
  181. if len(attachments) == 0 {
  182. return nil
  183. }
  184. return &metricdata.Exemplar{
  185. Value: v,
  186. Timestamp: t,
  187. Attachments: attachments,
  188. }
  189. }
  190. func (a *DistributionData) clone() AggregationData {
  191. c := *a
  192. c.CountPerBucket = append([]int64(nil), a.CountPerBucket...)
  193. c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...)
  194. return &c
  195. }
  196. func (a *DistributionData) equal(other AggregationData) bool {
  197. a2, ok := other.(*DistributionData)
  198. if !ok {
  199. return false
  200. }
  201. if a2 == nil {
  202. return false
  203. }
  204. if len(a.CountPerBucket) != len(a2.CountPerBucket) {
  205. return false
  206. }
  207. for i := range a.CountPerBucket {
  208. if a.CountPerBucket[i] != a2.CountPerBucket[i] {
  209. return false
  210. }
  211. }
  212. return a.Start.Equal(a2.Start) &&
  213. a.Count == a2.Count &&
  214. a.Min == a2.Min &&
  215. a.Max == a2.Max &&
  216. math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
  217. }
  218. func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  219. switch metricType {
  220. case metricdata.TypeCumulativeDistribution:
  221. buckets := []metricdata.Bucket{}
  222. for i := 0; i < len(a.CountPerBucket); i++ {
  223. buckets = append(buckets, metricdata.Bucket{
  224. Count: a.CountPerBucket[i],
  225. Exemplar: a.ExemplarsPerBucket[i],
  226. })
  227. }
  228. bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}
  229. val := &metricdata.Distribution{
  230. Count: a.Count,
  231. Sum: a.Sum(),
  232. SumOfSquaredDeviation: a.SumOfSquaredDev,
  233. BucketOptions: bucketOptions,
  234. Buckets: buckets,
  235. }
  236. return metricdata.NewDistributionPoint(t, val)
  237. default:
  238. // TODO: [rghetia] when we have a use case for TypeGaugeDistribution.
  239. panic("unsupported metricdata.Type")
  240. }
  241. }
  242. // StartTime returns the start time of the data being aggregated by DistributionData.
  243. func (a *DistributionData) StartTime() time.Time {
  244. return a.Start
  245. }
  246. // LastValueData returns the last value recorded for LastValue aggregation.
  247. type LastValueData struct {
  248. Value float64
  249. }
  250. func (l *LastValueData) isAggregationData() bool {
  251. return true
  252. }
  253. func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
  254. l.Value = v
  255. }
  256. func (l *LastValueData) clone() AggregationData {
  257. return &LastValueData{l.Value}
  258. }
  259. func (l *LastValueData) equal(other AggregationData) bool {
  260. a2, ok := other.(*LastValueData)
  261. if !ok {
  262. return false
  263. }
  264. return l.Value == a2.Value
  265. }
  266. func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  267. switch metricType {
  268. case metricdata.TypeGaugeInt64:
  269. return metricdata.NewInt64Point(t, int64(l.Value))
  270. case metricdata.TypeGaugeFloat64:
  271. return metricdata.NewFloat64Point(t, l.Value)
  272. default:
  273. panic("unsupported metricdata.Type")
  274. }
  275. }
  276. // StartTime returns an empty time value as start time is not recorded when using last value
  277. // aggregation.
  278. func (l *LastValueData) StartTime() time.Time {
  279. return time.Time{}
  280. }
  281. // ClearStart clears the Start field from data if present. Useful for testing in cases where the
  282. // start time will be nondeterministic.
  283. func ClearStart(data AggregationData) {
  284. switch data := data.(type) {
  285. case *CountData:
  286. data.Start = time.Time{}
  287. case *SumData:
  288. data.Start = time.Time{}
  289. case *DistributionData:
  290. data.Start = time.Time{}
  291. }
  292. }