worker.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package view
  16. import (
  17. "fmt"
  18. "sync"
  19. "time"
  20. "go.opencensus.io/resource"
  21. "go.opencensus.io/metric/metricdata"
  22. "go.opencensus.io/metric/metricproducer"
  23. "go.opencensus.io/stats"
  24. "go.opencensus.io/stats/internal"
  25. "go.opencensus.io/tag"
  26. )
  27. func init() {
  28. defaultWorker = NewMeter().(*worker)
  29. go defaultWorker.start()
  30. internal.DefaultRecorder = record
  31. internal.MeasurementRecorder = recordMeasurement
  32. }
  33. type measureRef struct {
  34. measure string
  35. views map[*viewInternal]struct{}
  36. }
  37. type worker struct {
  38. measures map[string]*measureRef
  39. views map[string]*viewInternal
  40. viewStartTimes map[*viewInternal]time.Time
  41. timer *time.Ticker
  42. c chan command
  43. quit, done chan bool
  44. mu sync.RWMutex
  45. r *resource.Resource
  46. exportersMu sync.RWMutex
  47. exporters map[Exporter]struct{}
  48. }
  49. // Meter defines an interface which allows a single process to maintain
  50. // multiple sets of metrics exports (intended for the advanced case where a
  51. // single process wants to report metrics about multiple objects, such as
  52. // multiple databases or HTTP services).
  53. //
  54. // Note that this is an advanced use case, and the static functions in this
  55. // module should cover the common use cases.
  56. type Meter interface {
  57. stats.Recorder
  58. // Find returns a registered view associated with this name.
  59. // If no registered view is found, nil is returned.
  60. Find(name string) *View
  61. // Register begins collecting data for the given views.
  62. // Once a view is registered, it reports data to the registered exporters.
  63. Register(views ...*View) error
  64. // Unregister the given views. Data will not longer be exported for these views
  65. // after Unregister returns.
  66. // It is not necessary to unregister from views you expect to collect for the
  67. // duration of your program execution.
  68. Unregister(views ...*View)
  69. // SetReportingPeriod sets the interval between reporting aggregated views in
  70. // the program. If duration is less than or equal to zero, it enables the
  71. // default behavior.
  72. //
  73. // Note: each exporter makes different promises about what the lowest supported
  74. // duration is. For example, the Stackdriver exporter recommends a value no
  75. // lower than 1 minute. Consult each exporter per your needs.
  76. SetReportingPeriod(time.Duration)
  77. // RegisterExporter registers an exporter.
  78. // Collected data will be reported via all the
  79. // registered exporters. Once you no longer
  80. // want data to be exported, invoke UnregisterExporter
  81. // with the previously registered exporter.
  82. //
  83. // Binaries can register exporters, libraries shouldn't register exporters.
  84. RegisterExporter(Exporter)
  85. // UnregisterExporter unregisters an exporter.
  86. UnregisterExporter(Exporter)
  87. // SetResource may be used to set the Resource associated with this registry.
  88. // This is intended to be used in cases where a single process exports metrics
  89. // for multiple Resources, typically in a multi-tenant situation.
  90. SetResource(*resource.Resource)
  91. // Start causes the Meter to start processing Record calls and aggregating
  92. // statistics as well as exporting data.
  93. Start()
  94. // Stop causes the Meter to stop processing calls and terminate data export.
  95. Stop()
  96. // RetrieveData gets a snapshot of the data collected for the the view registered
  97. // with the given name. It is intended for testing only.
  98. RetrieveData(viewName string) ([]*Row, error)
  99. }
  100. var _ Meter = (*worker)(nil)
  101. var defaultWorker *worker
  102. var defaultReportingDuration = 10 * time.Second
  103. // Find returns a registered view associated with this name.
  104. // If no registered view is found, nil is returned.
  105. func Find(name string) (v *View) {
  106. return defaultWorker.Find(name)
  107. }
  108. // Find returns a registered view associated with this name.
  109. // If no registered view is found, nil is returned.
  110. func (w *worker) Find(name string) (v *View) {
  111. req := &getViewByNameReq{
  112. name: name,
  113. c: make(chan *getViewByNameResp),
  114. }
  115. w.c <- req
  116. resp := <-req.c
  117. return resp.v
  118. }
  119. // Register begins collecting data for the given views.
  120. // Once a view is registered, it reports data to the registered exporters.
  121. func Register(views ...*View) error {
  122. return defaultWorker.Register(views...)
  123. }
  124. // Register begins collecting data for the given views.
  125. // Once a view is registered, it reports data to the registered exporters.
  126. func (w *worker) Register(views ...*View) error {
  127. req := &registerViewReq{
  128. views: views,
  129. err: make(chan error),
  130. }
  131. w.c <- req
  132. return <-req.err
  133. }
  134. // Unregister the given views. Data will not longer be exported for these views
  135. // after Unregister returns.
  136. // It is not necessary to unregister from views you expect to collect for the
  137. // duration of your program execution.
  138. func Unregister(views ...*View) {
  139. defaultWorker.Unregister(views...)
  140. }
  141. // Unregister the given views. Data will not longer be exported for these views
  142. // after Unregister returns.
  143. // It is not necessary to unregister from views you expect to collect for the
  144. // duration of your program execution.
  145. func (w *worker) Unregister(views ...*View) {
  146. names := make([]string, len(views))
  147. for i := range views {
  148. names[i] = views[i].Name
  149. }
  150. req := &unregisterFromViewReq{
  151. views: names,
  152. done: make(chan struct{}),
  153. }
  154. w.c <- req
  155. <-req.done
  156. }
  157. // RetrieveData gets a snapshot of the data collected for the the view registered
  158. // with the given name. It is intended for testing only.
  159. func RetrieveData(viewName string) ([]*Row, error) {
  160. return defaultWorker.RetrieveData(viewName)
  161. }
  162. // RetrieveData gets a snapshot of the data collected for the the view registered
  163. // with the given name. It is intended for testing only.
  164. func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
  165. req := &retrieveDataReq{
  166. now: time.Now(),
  167. v: viewName,
  168. c: make(chan *retrieveDataResp),
  169. }
  170. w.c <- req
  171. resp := <-req.c
  172. return resp.rows, resp.err
  173. }
  174. func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
  175. defaultWorker.Record(tags, ms, attachments)
  176. }
  177. func recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
  178. defaultWorker.recordMeasurement(tags, ms, attachments)
  179. }
  180. // Record records a set of measurements ms associated with the given tags and attachments.
  181. func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
  182. w.recordMeasurement(tags, ms.([]stats.Measurement), attachments)
  183. }
  184. // recordMeasurement records a set of measurements ms associated with the given tags and attachments.
  185. // This is the same as Record but without an interface{} type to avoid allocations
  186. func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
  187. req := &recordReq{
  188. tm: tags,
  189. ms: ms,
  190. attachments: attachments,
  191. t: time.Now(),
  192. }
  193. w.c <- req
  194. }
  195. // SetReportingPeriod sets the interval between reporting aggregated views in
  196. // the program. If duration is less than or equal to zero, it enables the
  197. // default behavior.
  198. //
  199. // Note: each exporter makes different promises about what the lowest supported
  200. // duration is. For example, the Stackdriver exporter recommends a value no
  201. // lower than 1 minute. Consult each exporter per your needs.
  202. func SetReportingPeriod(d time.Duration) {
  203. defaultWorker.SetReportingPeriod(d)
  204. }
  205. // Stop stops the default worker.
  206. func Stop() {
  207. defaultWorker.Stop()
  208. }
  209. // SetReportingPeriod sets the interval between reporting aggregated views in
  210. // the program. If duration is less than or equal to zero, it enables the
  211. // default behavior.
  212. //
  213. // Note: each exporter makes different promises about what the lowest supported
  214. // duration is. For example, the Stackdriver exporter recommends a value no
  215. // lower than 1 minute. Consult each exporter per your needs.
  216. func (w *worker) SetReportingPeriod(d time.Duration) {
  217. // TODO(acetechnologist): ensure that the duration d is more than a certain
  218. // value. e.g. 1s
  219. req := &setReportingPeriodReq{
  220. d: d,
  221. c: make(chan bool),
  222. }
  223. w.c <- req
  224. <-req.c // don't return until the timer is set to the new duration.
  225. }
  226. // NewMeter constructs a Meter instance. You should only need to use this if
  227. // you need to separate out Measurement recordings and View aggregations within
  228. // a single process.
  229. func NewMeter() Meter {
  230. return &worker{
  231. measures: make(map[string]*measureRef),
  232. views: make(map[string]*viewInternal),
  233. viewStartTimes: make(map[*viewInternal]time.Time),
  234. timer: time.NewTicker(defaultReportingDuration),
  235. c: make(chan command, 1024),
  236. quit: make(chan bool),
  237. done: make(chan bool),
  238. exporters: make(map[Exporter]struct{}),
  239. }
  240. }
  241. // SetResource associates all data collected by this Meter with the specified
  242. // resource. This resource is reported when using metricexport.ReadAndExport;
  243. // it is not provided when used with ExportView/RegisterExporter, because that
  244. // interface does not provide a means for reporting the Resource.
  245. func (w *worker) SetResource(r *resource.Resource) {
  246. w.r = r
  247. }
  248. func (w *worker) Start() {
  249. go w.start()
  250. }
  251. func (w *worker) start() {
  252. prodMgr := metricproducer.GlobalManager()
  253. prodMgr.AddProducer(w)
  254. for {
  255. select {
  256. case cmd := <-w.c:
  257. cmd.handleCommand(w)
  258. case <-w.timer.C:
  259. w.reportUsage()
  260. case <-w.quit:
  261. w.timer.Stop()
  262. close(w.c)
  263. close(w.done)
  264. return
  265. }
  266. }
  267. }
  268. func (w *worker) Stop() {
  269. prodMgr := metricproducer.GlobalManager()
  270. prodMgr.DeleteProducer(w)
  271. select {
  272. case <-w.quit:
  273. default:
  274. close(w.quit)
  275. }
  276. <-w.done
  277. }
  278. func (w *worker) getMeasureRef(name string) *measureRef {
  279. if mr, ok := w.measures[name]; ok {
  280. return mr
  281. }
  282. mr := &measureRef{
  283. measure: name,
  284. views: make(map[*viewInternal]struct{}),
  285. }
  286. w.measures[name] = mr
  287. return mr
  288. }
  289. func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
  290. w.mu.Lock()
  291. defer w.mu.Unlock()
  292. vi, err := newViewInternal(v)
  293. if err != nil {
  294. return nil, err
  295. }
  296. if x, ok := w.views[vi.view.Name]; ok {
  297. if !x.view.same(vi.view) {
  298. return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
  299. }
  300. // the view is already registered so there is nothing to do and the
  301. // command is considered successful.
  302. return x, nil
  303. }
  304. w.views[vi.view.Name] = vi
  305. w.viewStartTimes[vi] = time.Now()
  306. ref := w.getMeasureRef(vi.view.Measure.Name())
  307. ref.views[vi] = struct{}{}
  308. return vi, nil
  309. }
  310. func (w *worker) unregisterView(v *viewInternal) {
  311. w.mu.Lock()
  312. defer w.mu.Unlock()
  313. delete(w.views, v.view.Name)
  314. delete(w.viewStartTimes, v)
  315. if measure := w.measures[v.view.Measure.Name()]; measure != nil {
  316. delete(measure.views, v)
  317. }
  318. }
  319. func (w *worker) reportView(v *viewInternal) {
  320. if !v.isSubscribed() {
  321. return
  322. }
  323. rows := v.collectedRows()
  324. viewData := &Data{
  325. View: v.view,
  326. Start: w.viewStartTimes[v],
  327. End: time.Now(),
  328. Rows: rows,
  329. }
  330. w.exportersMu.Lock()
  331. defer w.exportersMu.Unlock()
  332. for e := range w.exporters {
  333. e.ExportView(viewData)
  334. }
  335. }
  336. func (w *worker) reportUsage() {
  337. w.mu.Lock()
  338. defer w.mu.Unlock()
  339. for _, v := range w.views {
  340. w.reportView(v)
  341. }
  342. }
  343. func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
  344. if !v.isSubscribed() {
  345. return nil
  346. }
  347. return viewToMetric(v, w.r, now)
  348. }
  349. // Read reads all view data and returns them as metrics.
  350. // It is typically invoked by metric reader to export stats in metric format.
  351. func (w *worker) Read() []*metricdata.Metric {
  352. w.mu.Lock()
  353. defer w.mu.Unlock()
  354. now := time.Now()
  355. metrics := make([]*metricdata.Metric, 0, len(w.views))
  356. for _, v := range w.views {
  357. metric := w.toMetric(v, now)
  358. if metric != nil {
  359. metrics = append(metrics, metric)
  360. }
  361. }
  362. return metrics
  363. }
  364. func (w *worker) RegisterExporter(e Exporter) {
  365. w.exportersMu.Lock()
  366. defer w.exportersMu.Unlock()
  367. w.exporters[e] = struct{}{}
  368. }
  369. func (w *worker) UnregisterExporter(e Exporter) {
  370. w.exportersMu.Lock()
  371. defer w.exportersMu.Unlock()
  372. delete(w.exporters, e)
  373. }