spanstore.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package trace
  15. import (
  16. "sync"
  17. "time"
  18. "go.opencensus.io/internal"
  19. )
  20. const (
  21. maxBucketSize = 100000
  22. defaultBucketSize = 10
  23. )
  24. var (
  25. ssmu sync.RWMutex // protects spanStores
  26. spanStores = make(map[string]*spanStore)
  27. )
  28. // This exists purely to avoid exposing internal methods used by z-Pages externally.
  29. type internalOnly struct{}
  30. func init() {
  31. //TODO(#412): remove
  32. internal.Trace = &internalOnly{}
  33. }
  34. // ReportActiveSpans returns the active spans for the given name.
  35. func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
  36. s := spanStoreForName(name)
  37. if s == nil {
  38. return nil
  39. }
  40. var out []*SpanData
  41. s.mu.Lock()
  42. defer s.mu.Unlock()
  43. for activeSpan := range s.active {
  44. if s, ok := activeSpan.(*span); ok {
  45. out = append(out, s.makeSpanData())
  46. }
  47. }
  48. return out
  49. }
  50. // ReportSpansByError returns a sample of error spans.
  51. //
  52. // If code is nonzero, only spans with that status code are returned.
  53. func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
  54. s := spanStoreForName(name)
  55. if s == nil {
  56. return nil
  57. }
  58. var out []*SpanData
  59. s.mu.Lock()
  60. defer s.mu.Unlock()
  61. if code != 0 {
  62. if b, ok := s.errors[code]; ok {
  63. for _, sd := range b.buffer {
  64. if sd == nil {
  65. break
  66. }
  67. out = append(out, sd)
  68. }
  69. }
  70. } else {
  71. for _, b := range s.errors {
  72. for _, sd := range b.buffer {
  73. if sd == nil {
  74. break
  75. }
  76. out = append(out, sd)
  77. }
  78. }
  79. }
  80. return out
  81. }
  82. // ConfigureBucketSizes sets the number of spans to keep per latency and error
  83. // bucket for different span names.
  84. func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
  85. for _, bc := range bcs {
  86. latencyBucketSize := bc.MaxRequestsSucceeded
  87. if latencyBucketSize < 0 {
  88. latencyBucketSize = 0
  89. }
  90. if latencyBucketSize > maxBucketSize {
  91. latencyBucketSize = maxBucketSize
  92. }
  93. errorBucketSize := bc.MaxRequestsErrors
  94. if errorBucketSize < 0 {
  95. errorBucketSize = 0
  96. }
  97. if errorBucketSize > maxBucketSize {
  98. errorBucketSize = maxBucketSize
  99. }
  100. spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
  101. }
  102. }
  103. // ReportSpansPerMethod returns a summary of what spans are being stored for each span name.
  104. func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
  105. out := make(map[string]internal.PerMethodSummary)
  106. ssmu.RLock()
  107. defer ssmu.RUnlock()
  108. for name, s := range spanStores {
  109. s.mu.Lock()
  110. p := internal.PerMethodSummary{
  111. Active: len(s.active),
  112. }
  113. for code, b := range s.errors {
  114. p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
  115. ErrorCode: code,
  116. Size: b.size(),
  117. })
  118. }
  119. for i, b := range s.latency {
  120. min, max := latencyBucketBounds(i)
  121. p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
  122. MinLatency: min,
  123. MaxLatency: max,
  124. Size: b.size(),
  125. })
  126. }
  127. s.mu.Unlock()
  128. out[name] = p
  129. }
  130. return out
  131. }
  132. // ReportSpansByLatency returns a sample of successful spans.
  133. //
  134. // minLatency is the minimum latency of spans to be returned.
  135. // maxLatency, if nonzero, is the maximum latency of spans to be returned.
  136. func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
  137. s := spanStoreForName(name)
  138. if s == nil {
  139. return nil
  140. }
  141. var out []*SpanData
  142. s.mu.Lock()
  143. defer s.mu.Unlock()
  144. for i, b := range s.latency {
  145. min, max := latencyBucketBounds(i)
  146. if i+1 != len(s.latency) && max <= minLatency {
  147. continue
  148. }
  149. if maxLatency != 0 && maxLatency < min {
  150. continue
  151. }
  152. for _, sd := range b.buffer {
  153. if sd == nil {
  154. break
  155. }
  156. if minLatency != 0 || maxLatency != 0 {
  157. d := sd.EndTime.Sub(sd.StartTime)
  158. if d < minLatency {
  159. continue
  160. }
  161. if maxLatency != 0 && d > maxLatency {
  162. continue
  163. }
  164. }
  165. out = append(out, sd)
  166. }
  167. }
  168. return out
  169. }
  170. // spanStore keeps track of spans stored for a particular span name.
  171. //
  172. // It contains all active spans; a sample of spans for failed requests,
  173. // categorized by error code; and a sample of spans for successful requests,
  174. // bucketed by latency.
  175. type spanStore struct {
  176. mu sync.Mutex // protects everything below.
  177. active map[SpanInterface]struct{}
  178. errors map[int32]*bucket
  179. latency []bucket
  180. maxSpansPerErrorBucket int
  181. }
  182. // newSpanStore creates a span store.
  183. func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
  184. s := &spanStore{
  185. active: make(map[SpanInterface]struct{}),
  186. latency: make([]bucket, len(defaultLatencies)+1),
  187. maxSpansPerErrorBucket: errorBucketSize,
  188. }
  189. for i := range s.latency {
  190. s.latency[i] = makeBucket(latencyBucketSize)
  191. }
  192. return s
  193. }
  194. // spanStoreForName returns the spanStore for the given name.
  195. //
  196. // It returns nil if it doesn't exist.
  197. func spanStoreForName(name string) *spanStore {
  198. var s *spanStore
  199. ssmu.RLock()
  200. s, _ = spanStores[name]
  201. ssmu.RUnlock()
  202. return s
  203. }
  204. // spanStoreForNameCreateIfNew returns the spanStore for the given name.
  205. //
  206. // It creates it if it didn't exist.
  207. func spanStoreForNameCreateIfNew(name string) *spanStore {
  208. ssmu.RLock()
  209. s, ok := spanStores[name]
  210. ssmu.RUnlock()
  211. if ok {
  212. return s
  213. }
  214. ssmu.Lock()
  215. defer ssmu.Unlock()
  216. s, ok = spanStores[name]
  217. if ok {
  218. return s
  219. }
  220. s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
  221. spanStores[name] = s
  222. return s
  223. }
  224. // spanStoreSetSize resizes the spanStore for the given name.
  225. //
  226. // It creates it if it didn't exist.
  227. func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
  228. ssmu.RLock()
  229. s, ok := spanStores[name]
  230. ssmu.RUnlock()
  231. if ok {
  232. s.resize(latencyBucketSize, errorBucketSize)
  233. return
  234. }
  235. ssmu.Lock()
  236. defer ssmu.Unlock()
  237. s, ok = spanStores[name]
  238. if ok {
  239. s.resize(latencyBucketSize, errorBucketSize)
  240. return
  241. }
  242. s = newSpanStore(name, latencyBucketSize, errorBucketSize)
  243. spanStores[name] = s
  244. }
  245. func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
  246. s.mu.Lock()
  247. for i := range s.latency {
  248. s.latency[i].resize(latencyBucketSize)
  249. }
  250. for _, b := range s.errors {
  251. b.resize(errorBucketSize)
  252. }
  253. s.maxSpansPerErrorBucket = errorBucketSize
  254. s.mu.Unlock()
  255. }
  256. // add adds a span to the active bucket of the spanStore.
  257. func (s *spanStore) add(span SpanInterface) {
  258. s.mu.Lock()
  259. s.active[span] = struct{}{}
  260. s.mu.Unlock()
  261. }
  262. // finished removes a span from the active set, and adds a corresponding
  263. // SpanData to a latency or error bucket.
  264. func (s *spanStore) finished(span SpanInterface, sd *SpanData) {
  265. latency := sd.EndTime.Sub(sd.StartTime)
  266. if latency < 0 {
  267. latency = 0
  268. }
  269. code := sd.Status.Code
  270. s.mu.Lock()
  271. delete(s.active, span)
  272. if code == 0 {
  273. s.latency[latencyBucket(latency)].add(sd)
  274. } else {
  275. if s.errors == nil {
  276. s.errors = make(map[int32]*bucket)
  277. }
  278. if b := s.errors[code]; b != nil {
  279. b.add(sd)
  280. } else {
  281. b := makeBucket(s.maxSpansPerErrorBucket)
  282. s.errors[code] = &b
  283. b.add(sd)
  284. }
  285. }
  286. s.mu.Unlock()
  287. }