| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- // Package packetio provides packet buffer
- package packetio
- import (
- "errors"
- "io"
- "sync"
- "time"
- "github.com/pion/transport/deadline"
- )
- var errPacketTooBig = errors.New("packet too big")
- // BufferPacketType allow the Buffer to know which packet protocol is writing.
- type BufferPacketType int
- const (
- // RTPBufferPacket indicates the Buffer that is handling RTP packets
- RTPBufferPacket BufferPacketType = 1
- // RTCPBufferPacket indicates the Buffer that is handling RTCP packets
- RTCPBufferPacket BufferPacketType = 2
- )
- // Buffer allows writing packets to an intermediate buffer, which can then be read form.
- // This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read.
- type Buffer struct {
- mutex sync.Mutex
- // this is a circular buffer. If head <= tail, then the useful
- // data is in the interval [head, tail[. If tail < head, then
- // the useful data is the union of [head, len[ and [0, tail[.
- // In order to avoid ambiguity when head = tail, we always leave
- // an unused byte in the buffer.
- data []byte
- head, tail int
- notify chan struct{}
- subs bool
- closed bool
- count int
- limitCount, limitSize int
- readDeadline *deadline.Deadline
- }
- const (
- minSize = 2048
- cutoffSize = 128 * 1024
- maxSize = 4 * 1024 * 1024
- )
- // NewBuffer creates a new Buffer.
- func NewBuffer() *Buffer {
- return &Buffer{
- notify: make(chan struct{}),
- readDeadline: deadline.New(),
- }
- }
- // available returns true if the buffer is large enough to fit a packet
- // of the given size, taking overhead into account.
- func (b *Buffer) available(size int) bool {
- available := b.head - b.tail
- if available <= 0 {
- available += len(b.data)
- }
- // we interpret head=tail as empty, so always keep a byte free
- if size+2+1 > available {
- return false
- }
- return true
- }
- // grow increases the size of the buffer. If it returns nil, then the
- // buffer has been grown. It returns ErrFull if hits a limit.
- func (b *Buffer) grow() error {
- var newsize int
- if len(b.data) < cutoffSize {
- newsize = 2 * len(b.data)
- } else {
- newsize = 5 * len(b.data) / 4
- }
- if newsize < minSize {
- newsize = minSize
- }
- if (b.limitSize <= 0 || sizeHardlimit) && newsize > maxSize {
- newsize = maxSize
- }
- // one byte slack
- if b.limitSize > 0 && newsize > b.limitSize+1 {
- newsize = b.limitSize + 1
- }
- if newsize <= len(b.data) {
- return ErrFull
- }
- newdata := make([]byte, newsize)
- var n int
- if b.head <= b.tail {
- // data was contiguous
- n = copy(newdata, b.data[b.head:b.tail])
- } else {
- // data was discontiguous
- n = copy(newdata, b.data[b.head:])
- n += copy(newdata[n:], b.data[:b.tail])
- }
- b.head = 0
- b.tail = n
- b.data = newdata
- return nil
- }
- // Write appends a copy of the packet data to the buffer.
- // Returns ErrFull if the packet doesn't fit.
- //
- // Note that the packet size is limited to 65536 bytes since v0.11.0 due to the internal data structure.
- func (b *Buffer) Write(packet []byte) (int, error) {
- if len(packet) >= 0x10000 {
- return 0, errPacketTooBig
- }
- b.mutex.Lock()
- if b.closed {
- b.mutex.Unlock()
- return 0, io.ErrClosedPipe
- }
- if (b.limitCount > 0 && b.count >= b.limitCount) ||
- (b.limitSize > 0 && b.size()+2+len(packet) > b.limitSize) {
- b.mutex.Unlock()
- return 0, ErrFull
- }
- // grow the buffer until the packet fits
- for !b.available(len(packet)) {
- err := b.grow()
- if err != nil {
- b.mutex.Unlock()
- return 0, err
- }
- }
- var notify chan struct{}
- if b.subs {
- // readers are waiting. Prepare to notify, but only
- // actually do it after we release the lock.
- notify = b.notify
- b.notify = make(chan struct{})
- b.subs = false
- }
- // store the length of the packet
- b.data[b.tail] = uint8(len(packet) >> 8)
- b.tail++
- if b.tail >= len(b.data) {
- b.tail = 0
- }
- b.data[b.tail] = uint8(len(packet))
- b.tail++
- if b.tail >= len(b.data) {
- b.tail = 0
- }
- // store the packet
- n := copy(b.data[b.tail:], packet)
- b.tail += n
- if b.tail >= len(b.data) {
- // we reached the end, wrap around
- m := copy(b.data, packet[n:])
- b.tail = m
- }
- b.count++
- b.mutex.Unlock()
- if notify != nil {
- close(notify)
- }
- return len(packet), nil
- }
- // Read populates the given byte slice, returning the number of bytes read.
- // Blocks until data is available or the buffer is closed.
- // Returns io.ErrShortBuffer is the packet is too small to copy the Write.
- // Returns io.EOF if the buffer is closed.
- func (b *Buffer) Read(packet []byte) (n int, err error) {
- // Return immediately if the deadline is already exceeded.
- select {
- case <-b.readDeadline.Done():
- return 0, &netError{ErrTimeout, true, true}
- default:
- }
- for {
- b.mutex.Lock()
- if b.head != b.tail {
- // decode the packet size
- n1 := b.data[b.head]
- b.head++
- if b.head >= len(b.data) {
- b.head = 0
- }
- n2 := b.data[b.head]
- b.head++
- if b.head >= len(b.data) {
- b.head = 0
- }
- count := int((uint16(n1) << 8) | uint16(n2))
- // determine the number of bytes we'll actually copy
- copied := count
- if copied > len(packet) {
- copied = len(packet)
- }
- // copy the data
- if b.head+copied < len(b.data) {
- copy(packet, b.data[b.head:b.head+copied])
- } else {
- k := copy(packet, b.data[b.head:])
- copy(packet[k:], b.data[:copied-k])
- }
- // advance head, discarding any data that wasn't copied
- b.head += count
- if b.head >= len(b.data) {
- b.head -= len(b.data)
- }
- if b.head == b.tail {
- // the buffer is empty, reset to beginning
- // in order to improve cache locality.
- b.head = 0
- b.tail = 0
- }
- b.count--
- b.mutex.Unlock()
- if copied < count {
- return copied, io.ErrShortBuffer
- }
- return copied, nil
- }
- if b.closed {
- b.mutex.Unlock()
- return 0, io.EOF
- }
- notify := b.notify
- b.subs = true
- b.mutex.Unlock()
- select {
- case <-b.readDeadline.Done():
- return 0, &netError{ErrTimeout, true, true}
- case <-notify:
- }
- }
- }
- // Close the buffer, unblocking any pending reads.
- // Data in the buffer can still be read, Read will return io.EOF only when empty.
- func (b *Buffer) Close() (err error) {
- b.mutex.Lock()
- if b.closed {
- b.mutex.Unlock()
- return nil
- }
- notify := b.notify
- b.closed = true
- b.mutex.Unlock()
- close(notify)
- return nil
- }
- // Count returns the number of packets in the buffer.
- func (b *Buffer) Count() int {
- b.mutex.Lock()
- defer b.mutex.Unlock()
- return b.count
- }
- // SetLimitCount controls the maximum number of packets that can be buffered.
- // Causes Write to return ErrFull when this limit is reached.
- // A zero value will disable this limit.
- func (b *Buffer) SetLimitCount(limit int) {
- b.mutex.Lock()
- defer b.mutex.Unlock()
- b.limitCount = limit
- }
- // Size returns the total byte size of packets in the buffer, including
- // a small amount of administrative overhead.
- func (b *Buffer) Size() int {
- b.mutex.Lock()
- defer b.mutex.Unlock()
- return b.size()
- }
- func (b *Buffer) size() int {
- size := b.tail - b.head
- if size < 0 {
- size += len(b.data)
- }
- return size
- }
- // SetLimitSize controls the maximum number of bytes that can be buffered.
- // Causes Write to return ErrFull when this limit is reached.
- // A zero value means 4MB since v0.11.0.
- //
- // User can set packetioSizeHardlimit build tag to enable 4MB hardlimit.
- // When packetioSizeHardlimit build tag is set, SetLimitSize exceeding
- // the hardlimit will be silently discarded.
- func (b *Buffer) SetLimitSize(limit int) {
- b.mutex.Lock()
- defer b.mutex.Unlock()
- b.limitSize = limit
- }
- // SetReadDeadline sets the deadline for the Read operation.
- // Setting to zero means no deadline.
- func (b *Buffer) SetReadDeadline(t time.Time) error {
- b.readDeadline.Set(t)
- return nil
- }
|