payload.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. package tracer
  6. import (
  7. "bytes"
  8. "encoding/binary"
  9. "io"
  10. "sync/atomic"
  11. "github.com/tinylib/msgp/msgp"
  12. )
  13. // payload is a wrapper on top of the msgpack encoder which allows constructing an
  14. // encoded array by pushing its entries sequentially, one at a time. It basically
  15. // allows us to encode as we would with a stream, except that the contents of the stream
  16. // can be read as a slice by the msgpack decoder at any time. It follows the guidelines
  17. // from the msgpack array spec:
  18. // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
  19. //
  20. // payload implements io.Reader and can be used with the decoder directly. To create
  21. // a new payload use the newPayload method.
  22. //
  23. // payload is not safe for concurrent use, is meant to be used only once and eventually
  24. // dismissed.
  25. type payload struct {
  26. // header specifies the first few bytes in the msgpack stream
  27. // indicating the type of array (fixarray, array16 or array32)
  28. // and the number of items contained in the stream.
  29. header []byte
  30. // off specifies the current read position on the header.
  31. off int
  32. // count specifies the number of items in the stream.
  33. count uint32
  34. // buf holds the sequence of msgpack-encoded items.
  35. buf bytes.Buffer
  36. }
  37. var _ io.Reader = (*payload)(nil)
  38. // newPayload returns a ready to use payload.
  39. func newPayload() *payload {
  40. p := &payload{
  41. header: make([]byte, 8),
  42. off: 8,
  43. }
  44. return p
  45. }
  46. // push pushes a new item into the stream.
  47. func (p *payload) push(t spanList) error {
  48. if err := msgp.Encode(&p.buf, t); err != nil {
  49. return err
  50. }
  51. atomic.AddUint32(&p.count, 1)
  52. p.updateHeader()
  53. return nil
  54. }
  55. // itemCount returns the number of items available in the srteam.
  56. func (p *payload) itemCount() int {
  57. return int(atomic.LoadUint32(&p.count))
  58. }
  59. // size returns the payload size in bytes. After the first read the value becomes
  60. // inaccurate by up to 8 bytes.
  61. func (p *payload) size() int {
  62. return p.buf.Len() + len(p.header) - p.off
  63. }
  64. // reset should *not* be used. It is not implemented and is only here to serve
  65. // as information on how to implement it in case the same payload object ever
  66. // needs to be reused.
  67. func (p *payload) reset() {
  68. // ⚠️ Warning!
  69. //
  70. // Resetting the payload for re-use requires the transport to wait for the
  71. // HTTP package to Close the request body before attempting to re-use it
  72. // again! This requires additional logic to be in place. See:
  73. //
  74. // • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138
  75. // • https://github.com/DataDog/dd-trace-go/pull/475
  76. // • https://github.com/DataDog/dd-trace-go/pull/549
  77. // • https://github.com/DataDog/dd-trace-go/pull/976
  78. //
  79. panic("not implemented")
  80. }
  81. // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
  82. const (
  83. msgpackArrayFix byte = 144 // up to 15 items
  84. msgpackArray16 = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
  85. msgpackArray32 = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
  86. )
  87. // updateHeader updates the payload header based on the number of items currently
  88. // present in the stream.
  89. func (p *payload) updateHeader() {
  90. n := uint64(atomic.LoadUint32(&p.count))
  91. switch {
  92. case n <= 15:
  93. p.header[7] = msgpackArrayFix + byte(n)
  94. p.off = 7
  95. case n <= 1<<16-1:
  96. binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
  97. p.header[5] = msgpackArray16
  98. p.off = 5
  99. default: // n <= 1<<32-1
  100. binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
  101. p.header[3] = msgpackArray32
  102. p.off = 3
  103. }
  104. }
  105. // Close implements io.Closer
  106. func (p *payload) Close() error {
  107. // Once the payload has been read, clear the buffer for garbage collection to avoid
  108. // a memory leak when references to this object may still be kept by faulty transport
  109. // implementations or the standard library. See dd-trace-go#976
  110. p.buf = bytes.Buffer{}
  111. return nil
  112. }
  113. // Read implements io.Reader. It reads from the msgpack-encoded stream.
  114. func (p *payload) Read(b []byte) (n int, err error) {
  115. if p.off < len(p.header) {
  116. // reading header
  117. n = copy(b, p.header[p.off:])
  118. p.off += n
  119. return n, nil
  120. }
  121. return p.buf.Read(b)
  122. }