| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2016 Datadog, Inc.
- package tracer
- import (
- "bytes"
- "encoding/binary"
- "io"
- "sync/atomic"
- "github.com/tinylib/msgp/msgp"
- )
- // payload is a wrapper on top of the msgpack encoder which allows constructing an
- // encoded array by pushing its entries sequentially, one at a time. It basically
- // allows us to encode as we would with a stream, except that the contents of the stream
- // can be read as a slice by the msgpack decoder at any time. It follows the guidelines
- // from the msgpack array spec:
- // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
- //
- // payload implements io.Reader and can be used with the decoder directly. To create
- // a new payload use the newPayload method.
- //
- // payload is not safe for concurrent use, is meant to be used only once and eventually
- // dismissed.
- type payload struct {
- // header specifies the first few bytes in the msgpack stream
- // indicating the type of array (fixarray, array16 or array32)
- // and the number of items contained in the stream.
- header []byte
- // off specifies the current read position on the header.
- off int
- // count specifies the number of items in the stream.
- count uint32
- // buf holds the sequence of msgpack-encoded items.
- buf bytes.Buffer
- }
- var _ io.Reader = (*payload)(nil)
- // newPayload returns a ready to use payload.
- func newPayload() *payload {
- p := &payload{
- header: make([]byte, 8),
- off: 8,
- }
- return p
- }
- // push pushes a new item into the stream.
- func (p *payload) push(t spanList) error {
- if err := msgp.Encode(&p.buf, t); err != nil {
- return err
- }
- atomic.AddUint32(&p.count, 1)
- p.updateHeader()
- return nil
- }
- // itemCount returns the number of items available in the srteam.
- func (p *payload) itemCount() int {
- return int(atomic.LoadUint32(&p.count))
- }
- // size returns the payload size in bytes. After the first read the value becomes
- // inaccurate by up to 8 bytes.
- func (p *payload) size() int {
- return p.buf.Len() + len(p.header) - p.off
- }
- // reset should *not* be used. It is not implemented and is only here to serve
- // as information on how to implement it in case the same payload object ever
- // needs to be reused.
- func (p *payload) reset() {
- // ⚠️ Warning!
- //
- // Resetting the payload for re-use requires the transport to wait for the
- // HTTP package to Close the request body before attempting to re-use it
- // again! This requires additional logic to be in place. See:
- //
- // • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138
- // • https://github.com/DataDog/dd-trace-go/pull/475
- // • https://github.com/DataDog/dd-trace-go/pull/549
- // • https://github.com/DataDog/dd-trace-go/pull/976
- //
- panic("not implemented")
- }
- // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
- const (
- msgpackArrayFix byte = 144 // up to 15 items
- msgpackArray16 = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
- msgpackArray32 = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
- )
- // updateHeader updates the payload header based on the number of items currently
- // present in the stream.
- func (p *payload) updateHeader() {
- n := uint64(atomic.LoadUint32(&p.count))
- switch {
- case n <= 15:
- p.header[7] = msgpackArrayFix + byte(n)
- p.off = 7
- case n <= 1<<16-1:
- binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
- p.header[5] = msgpackArray16
- p.off = 5
- default: // n <= 1<<32-1
- binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
- p.header[3] = msgpackArray32
- p.off = 3
- }
- }
- // Close implements io.Closer
- func (p *payload) Close() error {
- // Once the payload has been read, clear the buffer for garbage collection to avoid
- // a memory leak when references to this object may still be kept by faulty transport
- // implementations or the standard library. See dd-trace-go#976
- p.buf = bytes.Buffer{}
- return nil
- }
- // Read implements io.Reader. It reads from the msgpack-encoded stream.
- func (p *payload) Read(b []byte) (n int, err error) {
- if p.off < len(p.header) {
- // reading header
- n = copy(b, p.header[p.off:])
- p.off += n
- return n, nil
- }
- return p.buf.Read(b)
- }
|