method_logger.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "context"
  21. "net"
  22. "strings"
  23. "sync/atomic"
  24. "time"
  25. binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  26. "google.golang.org/grpc/metadata"
  27. "google.golang.org/grpc/status"
  28. "google.golang.org/protobuf/proto"
  29. "google.golang.org/protobuf/types/known/durationpb"
  30. "google.golang.org/protobuf/types/known/timestamppb"
  31. )
  32. type callIDGenerator struct {
  33. id uint64
  34. }
  35. func (g *callIDGenerator) next() uint64 {
  36. id := atomic.AddUint64(&g.id, 1)
  37. return id
  38. }
  39. // reset is for testing only, and doesn't need to be thread safe.
  40. func (g *callIDGenerator) reset() {
  41. g.id = 0
  42. }
  43. var idGen callIDGenerator
  44. // MethodLogger is the sub-logger for each method.
  45. //
  46. // This is used in the 1.0 release of gcp/observability, and thus must not be
  47. // deleted or changed.
  48. type MethodLogger interface {
  49. Log(context.Context, LogEntryConfig)
  50. }
  51. // TruncatingMethodLogger is a method logger that truncates headers and messages
  52. // based on configured fields.
  53. type TruncatingMethodLogger struct {
  54. headerMaxLen, messageMaxLen uint64
  55. callID uint64
  56. idWithinCallGen *callIDGenerator
  57. sink Sink // TODO(blog): make this plugable.
  58. }
  59. // NewTruncatingMethodLogger returns a new truncating method logger.
  60. //
  61. // This is used in the 1.0 release of gcp/observability, and thus must not be
  62. // deleted or changed.
  63. func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
  64. return &TruncatingMethodLogger{
  65. headerMaxLen: h,
  66. messageMaxLen: m,
  67. callID: idGen.next(),
  68. idWithinCallGen: &callIDGenerator{},
  69. sink: DefaultSink, // TODO(blog): make it plugable.
  70. }
  71. }
  72. // Build is an internal only method for building the proto message out of the
  73. // input event. It's made public to enable other library to reuse as much logic
  74. // in TruncatingMethodLogger as possible.
  75. func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
  76. m := c.toProto()
  77. timestamp := timestamppb.Now()
  78. m.Timestamp = timestamp
  79. m.CallId = ml.callID
  80. m.SequenceIdWithinCall = ml.idWithinCallGen.next()
  81. switch pay := m.Payload.(type) {
  82. case *binlogpb.GrpcLogEntry_ClientHeader:
  83. m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
  84. case *binlogpb.GrpcLogEntry_ServerHeader:
  85. m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
  86. case *binlogpb.GrpcLogEntry_Message:
  87. m.PayloadTruncated = ml.truncateMessage(pay.Message)
  88. }
  89. return m
  90. }
  91. // Log creates a proto binary log entry, and logs it to the sink.
  92. func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
  93. ml.sink.Write(ml.Build(c))
  94. }
  95. func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
  96. if ml.headerMaxLen == maxUInt {
  97. return false
  98. }
  99. var (
  100. bytesLimit = ml.headerMaxLen
  101. index int
  102. )
  103. // At the end of the loop, index will be the first entry where the total
  104. // size is greater than the limit:
  105. //
  106. // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
  107. for ; index < len(mdPb.Entry); index++ {
  108. entry := mdPb.Entry[index]
  109. if entry.Key == "grpc-trace-bin" {
  110. // "grpc-trace-bin" is a special key. It's kept in the log entry,
  111. // but not counted towards the size limit.
  112. continue
  113. }
  114. currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
  115. if currentEntryLen > bytesLimit {
  116. break
  117. }
  118. bytesLimit -= currentEntryLen
  119. }
  120. truncated = index < len(mdPb.Entry)
  121. mdPb.Entry = mdPb.Entry[:index]
  122. return truncated
  123. }
  124. func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
  125. if ml.messageMaxLen == maxUInt {
  126. return false
  127. }
  128. if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
  129. return false
  130. }
  131. msgPb.Data = msgPb.Data[:ml.messageMaxLen]
  132. return true
  133. }
  134. // LogEntryConfig represents the configuration for binary log entry.
  135. //
  136. // This is used in the 1.0 release of gcp/observability, and thus must not be
  137. // deleted or changed.
  138. type LogEntryConfig interface {
  139. toProto() *binlogpb.GrpcLogEntry
  140. }
  141. // ClientHeader configs the binary log entry to be a ClientHeader entry.
  142. type ClientHeader struct {
  143. OnClientSide bool
  144. Header metadata.MD
  145. MethodName string
  146. Authority string
  147. Timeout time.Duration
  148. // PeerAddr is required only when it's on server side.
  149. PeerAddr net.Addr
  150. }
  151. func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
  152. // This function doesn't need to set all the fields (e.g. seq ID). The Log
  153. // function will set the fields when necessary.
  154. clientHeader := &binlogpb.ClientHeader{
  155. Metadata: mdToMetadataProto(c.Header),
  156. MethodName: c.MethodName,
  157. Authority: c.Authority,
  158. }
  159. if c.Timeout > 0 {
  160. clientHeader.Timeout = durationpb.New(c.Timeout)
  161. }
  162. ret := &binlogpb.GrpcLogEntry{
  163. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
  164. Payload: &binlogpb.GrpcLogEntry_ClientHeader{
  165. ClientHeader: clientHeader,
  166. },
  167. }
  168. if c.OnClientSide {
  169. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  170. } else {
  171. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  172. }
  173. if c.PeerAddr != nil {
  174. ret.Peer = addrToProto(c.PeerAddr)
  175. }
  176. return ret
  177. }
  178. // ServerHeader configs the binary log entry to be a ServerHeader entry.
  179. type ServerHeader struct {
  180. OnClientSide bool
  181. Header metadata.MD
  182. // PeerAddr is required only when it's on client side.
  183. PeerAddr net.Addr
  184. }
  185. func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
  186. ret := &binlogpb.GrpcLogEntry{
  187. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
  188. Payload: &binlogpb.GrpcLogEntry_ServerHeader{
  189. ServerHeader: &binlogpb.ServerHeader{
  190. Metadata: mdToMetadataProto(c.Header),
  191. },
  192. },
  193. }
  194. if c.OnClientSide {
  195. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  196. } else {
  197. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  198. }
  199. if c.PeerAddr != nil {
  200. ret.Peer = addrToProto(c.PeerAddr)
  201. }
  202. return ret
  203. }
  204. // ClientMessage configs the binary log entry to be a ClientMessage entry.
  205. type ClientMessage struct {
  206. OnClientSide bool
  207. // Message can be a proto.Message or []byte. Other messages formats are not
  208. // supported.
  209. Message any
  210. }
  211. func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
  212. var (
  213. data []byte
  214. err error
  215. )
  216. if m, ok := c.Message.(proto.Message); ok {
  217. data, err = proto.Marshal(m)
  218. if err != nil {
  219. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  220. }
  221. } else if b, ok := c.Message.([]byte); ok {
  222. data = b
  223. } else {
  224. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  225. }
  226. ret := &binlogpb.GrpcLogEntry{
  227. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
  228. Payload: &binlogpb.GrpcLogEntry_Message{
  229. Message: &binlogpb.Message{
  230. Length: uint32(len(data)),
  231. Data: data,
  232. },
  233. },
  234. }
  235. if c.OnClientSide {
  236. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  237. } else {
  238. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  239. }
  240. return ret
  241. }
  242. // ServerMessage configs the binary log entry to be a ServerMessage entry.
  243. type ServerMessage struct {
  244. OnClientSide bool
  245. // Message can be a proto.Message or []byte. Other messages formats are not
  246. // supported.
  247. Message any
  248. }
  249. func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
  250. var (
  251. data []byte
  252. err error
  253. )
  254. if m, ok := c.Message.(proto.Message); ok {
  255. data, err = proto.Marshal(m)
  256. if err != nil {
  257. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  258. }
  259. } else if b, ok := c.Message.([]byte); ok {
  260. data = b
  261. } else {
  262. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  263. }
  264. ret := &binlogpb.GrpcLogEntry{
  265. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
  266. Payload: &binlogpb.GrpcLogEntry_Message{
  267. Message: &binlogpb.Message{
  268. Length: uint32(len(data)),
  269. Data: data,
  270. },
  271. },
  272. }
  273. if c.OnClientSide {
  274. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  275. } else {
  276. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  277. }
  278. return ret
  279. }
  280. // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
  281. type ClientHalfClose struct {
  282. OnClientSide bool
  283. }
  284. func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
  285. ret := &binlogpb.GrpcLogEntry{
  286. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
  287. Payload: nil, // No payload here.
  288. }
  289. if c.OnClientSide {
  290. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  291. } else {
  292. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  293. }
  294. return ret
  295. }
  296. // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
  297. type ServerTrailer struct {
  298. OnClientSide bool
  299. Trailer metadata.MD
  300. // Err is the status error.
  301. Err error
  302. // PeerAddr is required only when it's on client side and the RPC is trailer
  303. // only.
  304. PeerAddr net.Addr
  305. }
  306. func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
  307. st, ok := status.FromError(c.Err)
  308. if !ok {
  309. grpclogLogger.Info("binarylogging: error in trailer is not a status error")
  310. }
  311. var (
  312. detailsBytes []byte
  313. err error
  314. )
  315. stProto := st.Proto()
  316. if stProto != nil && len(stProto.Details) != 0 {
  317. detailsBytes, err = proto.Marshal(stProto)
  318. if err != nil {
  319. grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
  320. }
  321. }
  322. ret := &binlogpb.GrpcLogEntry{
  323. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
  324. Payload: &binlogpb.GrpcLogEntry_Trailer{
  325. Trailer: &binlogpb.Trailer{
  326. Metadata: mdToMetadataProto(c.Trailer),
  327. StatusCode: uint32(st.Code()),
  328. StatusMessage: st.Message(),
  329. StatusDetails: detailsBytes,
  330. },
  331. },
  332. }
  333. if c.OnClientSide {
  334. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  335. } else {
  336. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  337. }
  338. if c.PeerAddr != nil {
  339. ret.Peer = addrToProto(c.PeerAddr)
  340. }
  341. return ret
  342. }
  343. // Cancel configs the binary log entry to be a Cancel entry.
  344. type Cancel struct {
  345. OnClientSide bool
  346. }
  347. func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
  348. ret := &binlogpb.GrpcLogEntry{
  349. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
  350. Payload: nil,
  351. }
  352. if c.OnClientSide {
  353. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  354. } else {
  355. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  356. }
  357. return ret
  358. }
  359. // metadataKeyOmit returns whether the metadata entry with this key should be
  360. // omitted.
  361. func metadataKeyOmit(key string) bool {
  362. switch key {
  363. case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
  364. return true
  365. case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
  366. return false
  367. }
  368. return strings.HasPrefix(key, "grpc-")
  369. }
  370. func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
  371. ret := &binlogpb.Metadata{}
  372. for k, vv := range md {
  373. if metadataKeyOmit(k) {
  374. continue
  375. }
  376. for _, v := range vv {
  377. ret.Entry = append(ret.Entry,
  378. &binlogpb.MetadataEntry{
  379. Key: k,
  380. Value: []byte(v),
  381. },
  382. )
  383. }
  384. }
  385. return ret
  386. }
  387. func addrToProto(addr net.Addr) *binlogpb.Address {
  388. ret := &binlogpb.Address{}
  389. switch a := addr.(type) {
  390. case *net.TCPAddr:
  391. if a.IP.To4() != nil {
  392. ret.Type = binlogpb.Address_TYPE_IPV4
  393. } else if a.IP.To16() != nil {
  394. ret.Type = binlogpb.Address_TYPE_IPV6
  395. } else {
  396. ret.Type = binlogpb.Address_TYPE_UNKNOWN
  397. // Do not set address and port fields.
  398. break
  399. }
  400. ret.Address = a.IP.String()
  401. ret.IpPort = uint32(a.Port)
  402. case *net.UnixAddr:
  403. ret.Type = binlogpb.Address_TYPE_UNIX
  404. ret.Address = a.String()
  405. default:
  406. ret.Type = binlogpb.Address_TYPE_UNKNOWN
  407. }
  408. return ret
  409. }