stats_common.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package ocgrpc
  16. import (
  17. "context"
  18. "strconv"
  19. "strings"
  20. "sync/atomic"
  21. "time"
  22. "go.opencensus.io/metric/metricdata"
  23. ocstats "go.opencensus.io/stats"
  24. "go.opencensus.io/stats/view"
  25. "go.opencensus.io/tag"
  26. "go.opencensus.io/trace"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/grpclog"
  29. "google.golang.org/grpc/stats"
  30. "google.golang.org/grpc/status"
  31. )
  32. type grpcInstrumentationKey string
  33. // rpcData holds the instrumentation RPC data that is needed between the start
  34. // and end of an call. It holds the info that this package needs to keep track
  35. // of between the various GRPC events.
  36. type rpcData struct {
  37. // reqCount and respCount has to be the first words
  38. // in order to be 64-aligned on 32-bit architectures.
  39. sentCount, sentBytes, recvCount, recvBytes int64 // access atomically
  40. // startTime represents the time at which TagRPC was invoked at the
  41. // beginning of an RPC. It is an appoximation of the time when the
  42. // application code invoked GRPC code.
  43. startTime time.Time
  44. method string
  45. }
  46. // The following variables define the default hard-coded auxiliary data used by
  47. // both the default GRPC client and GRPC server metrics.
  48. var (
  49. DefaultBytesDistribution = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
  50. DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
  51. DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
  52. )
  53. // Server tags are applied to the context used to process each RPC, as well as
  54. // the measures at the end of each RPC.
  55. var (
  56. KeyServerMethod = tag.MustNewKey("grpc_server_method")
  57. KeyServerStatus = tag.MustNewKey("grpc_server_status")
  58. )
  59. // Client tags are applied to measures at the end of each RPC.
  60. var (
  61. KeyClientMethod = tag.MustNewKey("grpc_client_method")
  62. KeyClientStatus = tag.MustNewKey("grpc_client_status")
  63. )
  64. var (
  65. rpcDataKey = grpcInstrumentationKey("opencensus-rpcData")
  66. )
  67. func methodName(fullname string) string {
  68. return strings.TrimLeft(fullname, "/")
  69. }
  70. // statsHandleRPC processes the RPC events.
  71. func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
  72. switch st := s.(type) {
  73. case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
  74. // do nothing for client
  75. case *stats.Begin:
  76. handleRPCBegin(ctx, st)
  77. case *stats.OutPayload:
  78. handleRPCOutPayload(ctx, st)
  79. case *stats.InPayload:
  80. handleRPCInPayload(ctx, st)
  81. case *stats.End:
  82. handleRPCEnd(ctx, st)
  83. default:
  84. grpclog.Infof("unexpected stats: %T", st)
  85. }
  86. }
  87. func handleRPCBegin(ctx context.Context, s *stats.Begin) {
  88. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  89. if !ok {
  90. if grpclog.V(2) {
  91. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  92. }
  93. }
  94. if s.IsClient() {
  95. ocstats.RecordWithOptions(ctx,
  96. ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
  97. ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
  98. } else {
  99. ocstats.RecordWithOptions(ctx,
  100. ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
  101. ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
  102. }
  103. }
  104. func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
  105. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  106. if !ok {
  107. if grpclog.V(2) {
  108. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  109. }
  110. return
  111. }
  112. atomic.AddInt64(&d.sentBytes, int64(s.Length))
  113. atomic.AddInt64(&d.sentCount, 1)
  114. }
  115. func handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
  116. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  117. if !ok {
  118. if grpclog.V(2) {
  119. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  120. }
  121. return
  122. }
  123. atomic.AddInt64(&d.recvBytes, int64(s.Length))
  124. atomic.AddInt64(&d.recvCount, 1)
  125. }
  126. func handleRPCEnd(ctx context.Context, s *stats.End) {
  127. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  128. if !ok {
  129. if grpclog.V(2) {
  130. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  131. }
  132. return
  133. }
  134. elapsedTime := time.Since(d.startTime)
  135. var st string
  136. if s.Error != nil {
  137. s, ok := status.FromError(s.Error)
  138. if ok {
  139. st = statusCodeToString(s)
  140. }
  141. } else {
  142. st = "OK"
  143. }
  144. latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
  145. attachments := getSpanCtxAttachment(ctx)
  146. if s.Client {
  147. ocstats.RecordWithOptions(ctx,
  148. ocstats.WithTags(
  149. tag.Upsert(KeyClientMethod, methodName(d.method)),
  150. tag.Upsert(KeyClientStatus, st)),
  151. ocstats.WithAttachments(attachments),
  152. ocstats.WithMeasurements(
  153. ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
  154. ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
  155. ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
  156. ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
  157. ClientRoundtripLatency.M(latencyMillis)))
  158. } else {
  159. ocstats.RecordWithOptions(ctx,
  160. ocstats.WithTags(
  161. tag.Upsert(KeyServerStatus, st),
  162. ),
  163. ocstats.WithAttachments(attachments),
  164. ocstats.WithMeasurements(
  165. ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
  166. ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
  167. ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
  168. ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
  169. ServerLatency.M(latencyMillis)))
  170. }
  171. }
  172. func statusCodeToString(s *status.Status) string {
  173. // see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
  174. switch c := s.Code(); c {
  175. case codes.OK:
  176. return "OK"
  177. case codes.Canceled:
  178. return "CANCELLED"
  179. case codes.Unknown:
  180. return "UNKNOWN"
  181. case codes.InvalidArgument:
  182. return "INVALID_ARGUMENT"
  183. case codes.DeadlineExceeded:
  184. return "DEADLINE_EXCEEDED"
  185. case codes.NotFound:
  186. return "NOT_FOUND"
  187. case codes.AlreadyExists:
  188. return "ALREADY_EXISTS"
  189. case codes.PermissionDenied:
  190. return "PERMISSION_DENIED"
  191. case codes.ResourceExhausted:
  192. return "RESOURCE_EXHAUSTED"
  193. case codes.FailedPrecondition:
  194. return "FAILED_PRECONDITION"
  195. case codes.Aborted:
  196. return "ABORTED"
  197. case codes.OutOfRange:
  198. return "OUT_OF_RANGE"
  199. case codes.Unimplemented:
  200. return "UNIMPLEMENTED"
  201. case codes.Internal:
  202. return "INTERNAL"
  203. case codes.Unavailable:
  204. return "UNAVAILABLE"
  205. case codes.DataLoss:
  206. return "DATA_LOSS"
  207. case codes.Unauthenticated:
  208. return "UNAUTHENTICATED"
  209. default:
  210. return "CODE_" + strconv.FormatInt(int64(c), 10)
  211. }
  212. }
  213. func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments {
  214. attachments := map[string]interface{}{}
  215. span := trace.FromContext(ctx)
  216. if span == nil {
  217. return attachments
  218. }
  219. spanCtx := span.SpanContext()
  220. if spanCtx.IsSampled() {
  221. attachments[metricdata.AttachmentKeySpanContext] = spanCtx
  222. }
  223. return attachments
  224. }