| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- // Copyright 2017, OpenCensus Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //
- package view
- import (
- "fmt"
- "sync"
- "time"
- "go.opencensus.io/resource"
- "go.opencensus.io/metric/metricdata"
- "go.opencensus.io/metric/metricproducer"
- "go.opencensus.io/stats"
- "go.opencensus.io/stats/internal"
- "go.opencensus.io/tag"
- )
- func init() {
- defaultWorker = NewMeter().(*worker)
- go defaultWorker.start()
- internal.DefaultRecorder = record
- internal.MeasurementRecorder = recordMeasurement
- }
- type measureRef struct {
- measure string
- views map[*viewInternal]struct{}
- }
- type worker struct {
- measures map[string]*measureRef
- views map[string]*viewInternal
- viewStartTimes map[*viewInternal]time.Time
- timer *time.Ticker
- c chan command
- quit, done chan bool
- mu sync.RWMutex
- r *resource.Resource
- exportersMu sync.RWMutex
- exporters map[Exporter]struct{}
- }
- // Meter defines an interface which allows a single process to maintain
- // multiple sets of metrics exports (intended for the advanced case where a
- // single process wants to report metrics about multiple objects, such as
- // multiple databases or HTTP services).
- //
- // Note that this is an advanced use case, and the static functions in this
- // module should cover the common use cases.
- type Meter interface {
- stats.Recorder
- // Find returns a registered view associated with this name.
- // If no registered view is found, nil is returned.
- Find(name string) *View
- // Register begins collecting data for the given views.
- // Once a view is registered, it reports data to the registered exporters.
- Register(views ...*View) error
- // Unregister the given views. Data will not longer be exported for these views
- // after Unregister returns.
- // It is not necessary to unregister from views you expect to collect for the
- // duration of your program execution.
- Unregister(views ...*View)
- // SetReportingPeriod sets the interval between reporting aggregated views in
- // the program. If duration is less than or equal to zero, it enables the
- // default behavior.
- //
- // Note: each exporter makes different promises about what the lowest supported
- // duration is. For example, the Stackdriver exporter recommends a value no
- // lower than 1 minute. Consult each exporter per your needs.
- SetReportingPeriod(time.Duration)
- // RegisterExporter registers an exporter.
- // Collected data will be reported via all the
- // registered exporters. Once you no longer
- // want data to be exported, invoke UnregisterExporter
- // with the previously registered exporter.
- //
- // Binaries can register exporters, libraries shouldn't register exporters.
- RegisterExporter(Exporter)
- // UnregisterExporter unregisters an exporter.
- UnregisterExporter(Exporter)
- // SetResource may be used to set the Resource associated with this registry.
- // This is intended to be used in cases where a single process exports metrics
- // for multiple Resources, typically in a multi-tenant situation.
- SetResource(*resource.Resource)
- // Start causes the Meter to start processing Record calls and aggregating
- // statistics as well as exporting data.
- Start()
- // Stop causes the Meter to stop processing calls and terminate data export.
- Stop()
- // RetrieveData gets a snapshot of the data collected for the the view registered
- // with the given name. It is intended for testing only.
- RetrieveData(viewName string) ([]*Row, error)
- }
- var _ Meter = (*worker)(nil)
- var defaultWorker *worker
- var defaultReportingDuration = 10 * time.Second
- // Find returns a registered view associated with this name.
- // If no registered view is found, nil is returned.
- func Find(name string) (v *View) {
- return defaultWorker.Find(name)
- }
- // Find returns a registered view associated with this name.
- // If no registered view is found, nil is returned.
- func (w *worker) Find(name string) (v *View) {
- req := &getViewByNameReq{
- name: name,
- c: make(chan *getViewByNameResp),
- }
- w.c <- req
- resp := <-req.c
- return resp.v
- }
- // Register begins collecting data for the given views.
- // Once a view is registered, it reports data to the registered exporters.
- func Register(views ...*View) error {
- return defaultWorker.Register(views...)
- }
- // Register begins collecting data for the given views.
- // Once a view is registered, it reports data to the registered exporters.
- func (w *worker) Register(views ...*View) error {
- req := ®isterViewReq{
- views: views,
- err: make(chan error),
- }
- w.c <- req
- return <-req.err
- }
- // Unregister the given views. Data will not longer be exported for these views
- // after Unregister returns.
- // It is not necessary to unregister from views you expect to collect for the
- // duration of your program execution.
- func Unregister(views ...*View) {
- defaultWorker.Unregister(views...)
- }
- // Unregister the given views. Data will not longer be exported for these views
- // after Unregister returns.
- // It is not necessary to unregister from views you expect to collect for the
- // duration of your program execution.
- func (w *worker) Unregister(views ...*View) {
- names := make([]string, len(views))
- for i := range views {
- names[i] = views[i].Name
- }
- req := &unregisterFromViewReq{
- views: names,
- done: make(chan struct{}),
- }
- w.c <- req
- <-req.done
- }
- // RetrieveData gets a snapshot of the data collected for the the view registered
- // with the given name. It is intended for testing only.
- func RetrieveData(viewName string) ([]*Row, error) {
- return defaultWorker.RetrieveData(viewName)
- }
- // RetrieveData gets a snapshot of the data collected for the the view registered
- // with the given name. It is intended for testing only.
- func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
- req := &retrieveDataReq{
- now: time.Now(),
- v: viewName,
- c: make(chan *retrieveDataResp),
- }
- w.c <- req
- resp := <-req.c
- return resp.rows, resp.err
- }
- func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
- defaultWorker.Record(tags, ms, attachments)
- }
- func recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
- defaultWorker.recordMeasurement(tags, ms, attachments)
- }
- // Record records a set of measurements ms associated with the given tags and attachments.
- func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
- w.recordMeasurement(tags, ms.([]stats.Measurement), attachments)
- }
- // recordMeasurement records a set of measurements ms associated with the given tags and attachments.
- // This is the same as Record but without an interface{} type to avoid allocations
- func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
- req := &recordReq{
- tm: tags,
- ms: ms,
- attachments: attachments,
- t: time.Now(),
- }
- w.c <- req
- }
- // SetReportingPeriod sets the interval between reporting aggregated views in
- // the program. If duration is less than or equal to zero, it enables the
- // default behavior.
- //
- // Note: each exporter makes different promises about what the lowest supported
- // duration is. For example, the Stackdriver exporter recommends a value no
- // lower than 1 minute. Consult each exporter per your needs.
- func SetReportingPeriod(d time.Duration) {
- defaultWorker.SetReportingPeriod(d)
- }
- // Stop stops the default worker.
- func Stop() {
- defaultWorker.Stop()
- }
- // SetReportingPeriod sets the interval between reporting aggregated views in
- // the program. If duration is less than or equal to zero, it enables the
- // default behavior.
- //
- // Note: each exporter makes different promises about what the lowest supported
- // duration is. For example, the Stackdriver exporter recommends a value no
- // lower than 1 minute. Consult each exporter per your needs.
- func (w *worker) SetReportingPeriod(d time.Duration) {
- // TODO(acetechnologist): ensure that the duration d is more than a certain
- // value. e.g. 1s
- req := &setReportingPeriodReq{
- d: d,
- c: make(chan bool),
- }
- w.c <- req
- <-req.c // don't return until the timer is set to the new duration.
- }
- // NewMeter constructs a Meter instance. You should only need to use this if
- // you need to separate out Measurement recordings and View aggregations within
- // a single process.
- func NewMeter() Meter {
- return &worker{
- measures: make(map[string]*measureRef),
- views: make(map[string]*viewInternal),
- viewStartTimes: make(map[*viewInternal]time.Time),
- timer: time.NewTicker(defaultReportingDuration),
- c: make(chan command, 1024),
- quit: make(chan bool),
- done: make(chan bool),
- exporters: make(map[Exporter]struct{}),
- }
- }
- // SetResource associates all data collected by this Meter with the specified
- // resource. This resource is reported when using metricexport.ReadAndExport;
- // it is not provided when used with ExportView/RegisterExporter, because that
- // interface does not provide a means for reporting the Resource.
- func (w *worker) SetResource(r *resource.Resource) {
- w.r = r
- }
- func (w *worker) Start() {
- go w.start()
- }
- func (w *worker) start() {
- prodMgr := metricproducer.GlobalManager()
- prodMgr.AddProducer(w)
- for {
- select {
- case cmd := <-w.c:
- cmd.handleCommand(w)
- case <-w.timer.C:
- w.reportUsage()
- case <-w.quit:
- w.timer.Stop()
- close(w.c)
- close(w.done)
- return
- }
- }
- }
- func (w *worker) Stop() {
- prodMgr := metricproducer.GlobalManager()
- prodMgr.DeleteProducer(w)
- select {
- case <-w.quit:
- default:
- close(w.quit)
- }
- <-w.done
- }
- func (w *worker) getMeasureRef(name string) *measureRef {
- if mr, ok := w.measures[name]; ok {
- return mr
- }
- mr := &measureRef{
- measure: name,
- views: make(map[*viewInternal]struct{}),
- }
- w.measures[name] = mr
- return mr
- }
- func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
- w.mu.Lock()
- defer w.mu.Unlock()
- vi, err := newViewInternal(v)
- if err != nil {
- return nil, err
- }
- if x, ok := w.views[vi.view.Name]; ok {
- if !x.view.same(vi.view) {
- return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
- }
- // the view is already registered so there is nothing to do and the
- // command is considered successful.
- return x, nil
- }
- w.views[vi.view.Name] = vi
- w.viewStartTimes[vi] = time.Now()
- ref := w.getMeasureRef(vi.view.Measure.Name())
- ref.views[vi] = struct{}{}
- return vi, nil
- }
- func (w *worker) unregisterView(v *viewInternal) {
- w.mu.Lock()
- defer w.mu.Unlock()
- delete(w.views, v.view.Name)
- delete(w.viewStartTimes, v)
- if measure := w.measures[v.view.Measure.Name()]; measure != nil {
- delete(measure.views, v)
- }
- }
- func (w *worker) reportView(v *viewInternal) {
- if !v.isSubscribed() {
- return
- }
- rows := v.collectedRows()
- viewData := &Data{
- View: v.view,
- Start: w.viewStartTimes[v],
- End: time.Now(),
- Rows: rows,
- }
- w.exportersMu.Lock()
- defer w.exportersMu.Unlock()
- for e := range w.exporters {
- e.ExportView(viewData)
- }
- }
- func (w *worker) reportUsage() {
- w.mu.Lock()
- defer w.mu.Unlock()
- for _, v := range w.views {
- w.reportView(v)
- }
- }
- func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
- if !v.isSubscribed() {
- return nil
- }
- return viewToMetric(v, w.r, now)
- }
- // Read reads all view data and returns them as metrics.
- // It is typically invoked by metric reader to export stats in metric format.
- func (w *worker) Read() []*metricdata.Metric {
- w.mu.Lock()
- defer w.mu.Unlock()
- now := time.Now()
- metrics := make([]*metricdata.Metric, 0, len(w.views))
- for _, v := range w.views {
- metric := w.toMetric(v, now)
- if metric != nil {
- metrics = append(metrics, metric)
- }
- }
- return metrics
- }
- func (w *worker) RegisterExporter(e Exporter) {
- w.exportersMu.Lock()
- defer w.exportersMu.Unlock()
- w.exporters[e] = struct{}{}
- }
- func (w *worker) UnregisterExporter(e Exporter) {
- w.exportersMu.Lock()
- defer w.exportersMu.Unlock()
- delete(w.exporters, e)
- }
|