| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package otgrpc
- import (
- "github.com/opentracing/opentracing-go"
- "github.com/opentracing/opentracing-go/ext"
- "github.com/opentracing/opentracing-go/log"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
- "io"
- "runtime"
- "sync/atomic"
- )
- // OpenTracingClientInterceptor returns a grpc.UnaryClientInterceptor suitable
- // for use in a grpc.Dial call.
- //
- // For example:
- //
- // conn, err := grpc.Dial(
- // address,
- // ..., // (existing DialOptions)
- // grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
- //
- // All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
- // metadata; they will also look in the context.Context for an active
- // in-process parent Span and establish a ChildOf reference if such a parent
- // Span could be found.
- func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
- otgrpcOpts := newOptions()
- otgrpcOpts.apply(optFuncs...)
- return func(
- ctx context.Context,
- method string,
- req, resp interface{},
- cc *grpc.ClientConn,
- invoker grpc.UnaryInvoker,
- opts ...grpc.CallOption,
- ) error {
- var err error
- var parentCtx opentracing.SpanContext
- if parent := opentracing.SpanFromContext(ctx); parent != nil {
- parentCtx = parent.Context()
- }
- if otgrpcOpts.inclusionFunc != nil &&
- !otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
- return invoker(ctx, method, req, resp, cc, opts...)
- }
- clientSpan := tracer.StartSpan(
- method,
- opentracing.ChildOf(parentCtx),
- ext.SpanKindRPCClient,
- gRPCComponentTag,
- )
- defer clientSpan.Finish()
- ctx = injectSpanContext(ctx, tracer, clientSpan)
- if otgrpcOpts.logPayloads {
- clientSpan.LogFields(log.Object("gRPC request", req))
- }
- err = invoker(ctx, method, req, resp, cc, opts...)
- if err == nil {
- if otgrpcOpts.logPayloads {
- clientSpan.LogFields(log.Object("gRPC response", resp))
- }
- } else {
- SetSpanTags(clientSpan, err, true)
- clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
- }
- if otgrpcOpts.decorator != nil {
- otgrpcOpts.decorator(clientSpan, method, req, resp, err)
- }
- return err
- }
- }
- // OpenTracingStreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
- // for use in a grpc.Dial call. The interceptor instruments streaming RPCs by creating
- // a single span to correspond to the lifetime of the RPC's stream.
- //
- // For example:
- //
- // conn, err := grpc.Dial(
- // address,
- // ..., // (existing DialOptions)
- // grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
- //
- // All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
- // metadata; they will also look in the context.Context for an active
- // in-process parent Span and establish a ChildOf reference if such a parent
- // Span could be found.
- func OpenTracingStreamClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamClientInterceptor {
- otgrpcOpts := newOptions()
- otgrpcOpts.apply(optFuncs...)
- return func(
- ctx context.Context,
- desc *grpc.StreamDesc,
- cc *grpc.ClientConn,
- method string,
- streamer grpc.Streamer,
- opts ...grpc.CallOption,
- ) (grpc.ClientStream, error) {
- var err error
- var parentCtx opentracing.SpanContext
- if parent := opentracing.SpanFromContext(ctx); parent != nil {
- parentCtx = parent.Context()
- }
- if otgrpcOpts.inclusionFunc != nil &&
- !otgrpcOpts.inclusionFunc(parentCtx, method, nil, nil) {
- return streamer(ctx, desc, cc, method, opts...)
- }
- clientSpan := tracer.StartSpan(
- method,
- opentracing.ChildOf(parentCtx),
- ext.SpanKindRPCClient,
- gRPCComponentTag,
- )
- ctx = injectSpanContext(ctx, tracer, clientSpan)
- cs, err := streamer(ctx, desc, cc, method, opts...)
- if err != nil {
- clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
- SetSpanTags(clientSpan, err, true)
- clientSpan.Finish()
- return cs, err
- }
- return newOpenTracingClientStream(cs, method, desc, clientSpan, otgrpcOpts), nil
- }
- }
- func newOpenTracingClientStream(cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan opentracing.Span, otgrpcOpts *options) grpc.ClientStream {
- finishChan := make(chan struct{})
- isFinished := new(int32)
- *isFinished = 0
- finishFunc := func(err error) {
- // The current OpenTracing specification forbids finishing a span more than
- // once. Since we have multiple code paths that could concurrently call
- // `finishFunc`, we need to add some sort of synchronization to guard against
- // multiple finishing.
- if !atomic.CompareAndSwapInt32(isFinished, 0, 1) {
- return
- }
- close(finishChan)
- defer clientSpan.Finish()
- if err != nil {
- clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
- SetSpanTags(clientSpan, err, true)
- }
- if otgrpcOpts.decorator != nil {
- otgrpcOpts.decorator(clientSpan, method, nil, nil, err)
- }
- }
- go func() {
- select {
- case <-finishChan:
- // The client span is being finished by another code path; hence, no
- // action is necessary.
- case <-cs.Context().Done():
- finishFunc(cs.Context().Err())
- }
- }()
- otcs := &openTracingClientStream{
- ClientStream: cs,
- desc: desc,
- finishFunc: finishFunc,
- }
- // The `ClientStream` interface allows one to omit calling `Recv` if it's
- // known that the result will be `io.EOF`. See
- // http://stackoverflow.com/q/42915337
- // In such cases, there's nothing that triggers the span to finish. We,
- // therefore, set a finalizer so that the span and the context goroutine will
- // at least be cleaned up when the garbage collector is run.
- runtime.SetFinalizer(otcs, func(otcs *openTracingClientStream) {
- otcs.finishFunc(nil)
- })
- return otcs
- }
- type openTracingClientStream struct {
- grpc.ClientStream
- desc *grpc.StreamDesc
- finishFunc func(error)
- }
- func (cs *openTracingClientStream) Header() (metadata.MD, error) {
- md, err := cs.ClientStream.Header()
- if err != nil {
- cs.finishFunc(err)
- }
- return md, err
- }
- func (cs *openTracingClientStream) SendMsg(m interface{}) error {
- err := cs.ClientStream.SendMsg(m)
- if err != nil {
- cs.finishFunc(err)
- }
- return err
- }
- func (cs *openTracingClientStream) RecvMsg(m interface{}) error {
- err := cs.ClientStream.RecvMsg(m)
- if err == io.EOF {
- cs.finishFunc(nil)
- return err
- } else if err != nil {
- cs.finishFunc(err)
- return err
- }
- if !cs.desc.ServerStreams {
- cs.finishFunc(nil)
- }
- return err
- }
- func (cs *openTracingClientStream) CloseSend() error {
- err := cs.ClientStream.CloseSend()
- if err != nil {
- cs.finishFunc(err)
- }
- return err
- }
- func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context {
- md, ok := metadata.FromOutgoingContext(ctx)
- if !ok {
- md = metadata.New(nil)
- } else {
- md = md.Copy()
- }
- mdWriter := metadataReaderWriter{md}
- err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter)
- // We have no better place to record an error than the Span itself :-/
- if err != nil {
- clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
- }
- return metadata.NewOutgoingContext(ctx, md)
- }
|