buffer.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // Package packetio provides packet buffer
  2. package packetio
  3. import (
  4. "errors"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/pion/transport/deadline"
  9. )
  10. var errPacketTooBig = errors.New("packet too big")
  11. // BufferPacketType allow the Buffer to know which packet protocol is writing.
  12. type BufferPacketType int
  13. const (
  14. // RTPBufferPacket indicates the Buffer that is handling RTP packets
  15. RTPBufferPacket BufferPacketType = 1
  16. // RTCPBufferPacket indicates the Buffer that is handling RTCP packets
  17. RTCPBufferPacket BufferPacketType = 2
  18. )
  19. // Buffer allows writing packets to an intermediate buffer, which can then be read form.
  20. // This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read.
  21. type Buffer struct {
  22. mutex sync.Mutex
  23. // this is a circular buffer. If head <= tail, then the useful
  24. // data is in the interval [head, tail[. If tail < head, then
  25. // the useful data is the union of [head, len[ and [0, tail[.
  26. // In order to avoid ambiguity when head = tail, we always leave
  27. // an unused byte in the buffer.
  28. data []byte
  29. head, tail int
  30. notify chan struct{}
  31. subs bool
  32. closed bool
  33. count int
  34. limitCount, limitSize int
  35. readDeadline *deadline.Deadline
  36. }
  37. const (
  38. minSize = 2048
  39. cutoffSize = 128 * 1024
  40. maxSize = 4 * 1024 * 1024
  41. )
  42. // NewBuffer creates a new Buffer.
  43. func NewBuffer() *Buffer {
  44. return &Buffer{
  45. notify: make(chan struct{}),
  46. readDeadline: deadline.New(),
  47. }
  48. }
  49. // available returns true if the buffer is large enough to fit a packet
  50. // of the given size, taking overhead into account.
  51. func (b *Buffer) available(size int) bool {
  52. available := b.head - b.tail
  53. if available <= 0 {
  54. available += len(b.data)
  55. }
  56. // we interpret head=tail as empty, so always keep a byte free
  57. if size+2+1 > available {
  58. return false
  59. }
  60. return true
  61. }
  62. // grow increases the size of the buffer. If it returns nil, then the
  63. // buffer has been grown. It returns ErrFull if hits a limit.
  64. func (b *Buffer) grow() error {
  65. var newsize int
  66. if len(b.data) < cutoffSize {
  67. newsize = 2 * len(b.data)
  68. } else {
  69. newsize = 5 * len(b.data) / 4
  70. }
  71. if newsize < minSize {
  72. newsize = minSize
  73. }
  74. if (b.limitSize <= 0 || sizeHardlimit) && newsize > maxSize {
  75. newsize = maxSize
  76. }
  77. // one byte slack
  78. if b.limitSize > 0 && newsize > b.limitSize+1 {
  79. newsize = b.limitSize + 1
  80. }
  81. if newsize <= len(b.data) {
  82. return ErrFull
  83. }
  84. newdata := make([]byte, newsize)
  85. var n int
  86. if b.head <= b.tail {
  87. // data was contiguous
  88. n = copy(newdata, b.data[b.head:b.tail])
  89. } else {
  90. // data was discontiguous
  91. n = copy(newdata, b.data[b.head:])
  92. n += copy(newdata[n:], b.data[:b.tail])
  93. }
  94. b.head = 0
  95. b.tail = n
  96. b.data = newdata
  97. return nil
  98. }
  99. // Write appends a copy of the packet data to the buffer.
  100. // Returns ErrFull if the packet doesn't fit.
  101. //
  102. // Note that the packet size is limited to 65536 bytes since v0.11.0 due to the internal data structure.
  103. func (b *Buffer) Write(packet []byte) (int, error) {
  104. if len(packet) >= 0x10000 {
  105. return 0, errPacketTooBig
  106. }
  107. b.mutex.Lock()
  108. if b.closed {
  109. b.mutex.Unlock()
  110. return 0, io.ErrClosedPipe
  111. }
  112. if (b.limitCount > 0 && b.count >= b.limitCount) ||
  113. (b.limitSize > 0 && b.size()+2+len(packet) > b.limitSize) {
  114. b.mutex.Unlock()
  115. return 0, ErrFull
  116. }
  117. // grow the buffer until the packet fits
  118. for !b.available(len(packet)) {
  119. err := b.grow()
  120. if err != nil {
  121. b.mutex.Unlock()
  122. return 0, err
  123. }
  124. }
  125. var notify chan struct{}
  126. if b.subs {
  127. // readers are waiting. Prepare to notify, but only
  128. // actually do it after we release the lock.
  129. notify = b.notify
  130. b.notify = make(chan struct{})
  131. b.subs = false
  132. }
  133. // store the length of the packet
  134. b.data[b.tail] = uint8(len(packet) >> 8)
  135. b.tail++
  136. if b.tail >= len(b.data) {
  137. b.tail = 0
  138. }
  139. b.data[b.tail] = uint8(len(packet))
  140. b.tail++
  141. if b.tail >= len(b.data) {
  142. b.tail = 0
  143. }
  144. // store the packet
  145. n := copy(b.data[b.tail:], packet)
  146. b.tail += n
  147. if b.tail >= len(b.data) {
  148. // we reached the end, wrap around
  149. m := copy(b.data, packet[n:])
  150. b.tail = m
  151. }
  152. b.count++
  153. b.mutex.Unlock()
  154. if notify != nil {
  155. close(notify)
  156. }
  157. return len(packet), nil
  158. }
  159. // Read populates the given byte slice, returning the number of bytes read.
  160. // Blocks until data is available or the buffer is closed.
  161. // Returns io.ErrShortBuffer is the packet is too small to copy the Write.
  162. // Returns io.EOF if the buffer is closed.
  163. func (b *Buffer) Read(packet []byte) (n int, err error) {
  164. // Return immediately if the deadline is already exceeded.
  165. select {
  166. case <-b.readDeadline.Done():
  167. return 0, &netError{ErrTimeout, true, true}
  168. default:
  169. }
  170. for {
  171. b.mutex.Lock()
  172. if b.head != b.tail {
  173. // decode the packet size
  174. n1 := b.data[b.head]
  175. b.head++
  176. if b.head >= len(b.data) {
  177. b.head = 0
  178. }
  179. n2 := b.data[b.head]
  180. b.head++
  181. if b.head >= len(b.data) {
  182. b.head = 0
  183. }
  184. count := int((uint16(n1) << 8) | uint16(n2))
  185. // determine the number of bytes we'll actually copy
  186. copied := count
  187. if copied > len(packet) {
  188. copied = len(packet)
  189. }
  190. // copy the data
  191. if b.head+copied < len(b.data) {
  192. copy(packet, b.data[b.head:b.head+copied])
  193. } else {
  194. k := copy(packet, b.data[b.head:])
  195. copy(packet[k:], b.data[:copied-k])
  196. }
  197. // advance head, discarding any data that wasn't copied
  198. b.head += count
  199. if b.head >= len(b.data) {
  200. b.head -= len(b.data)
  201. }
  202. if b.head == b.tail {
  203. // the buffer is empty, reset to beginning
  204. // in order to improve cache locality.
  205. b.head = 0
  206. b.tail = 0
  207. }
  208. b.count--
  209. b.mutex.Unlock()
  210. if copied < count {
  211. return copied, io.ErrShortBuffer
  212. }
  213. return copied, nil
  214. }
  215. if b.closed {
  216. b.mutex.Unlock()
  217. return 0, io.EOF
  218. }
  219. notify := b.notify
  220. b.subs = true
  221. b.mutex.Unlock()
  222. select {
  223. case <-b.readDeadline.Done():
  224. return 0, &netError{ErrTimeout, true, true}
  225. case <-notify:
  226. }
  227. }
  228. }
  229. // Close the buffer, unblocking any pending reads.
  230. // Data in the buffer can still be read, Read will return io.EOF only when empty.
  231. func (b *Buffer) Close() (err error) {
  232. b.mutex.Lock()
  233. if b.closed {
  234. b.mutex.Unlock()
  235. return nil
  236. }
  237. notify := b.notify
  238. b.closed = true
  239. b.mutex.Unlock()
  240. close(notify)
  241. return nil
  242. }
  243. // Count returns the number of packets in the buffer.
  244. func (b *Buffer) Count() int {
  245. b.mutex.Lock()
  246. defer b.mutex.Unlock()
  247. return b.count
  248. }
  249. // SetLimitCount controls the maximum number of packets that can be buffered.
  250. // Causes Write to return ErrFull when this limit is reached.
  251. // A zero value will disable this limit.
  252. func (b *Buffer) SetLimitCount(limit int) {
  253. b.mutex.Lock()
  254. defer b.mutex.Unlock()
  255. b.limitCount = limit
  256. }
  257. // Size returns the total byte size of packets in the buffer, including
  258. // a small amount of administrative overhead.
  259. func (b *Buffer) Size() int {
  260. b.mutex.Lock()
  261. defer b.mutex.Unlock()
  262. return b.size()
  263. }
  264. func (b *Buffer) size() int {
  265. size := b.tail - b.head
  266. if size < 0 {
  267. size += len(b.data)
  268. }
  269. return size
  270. }
  271. // SetLimitSize controls the maximum number of bytes that can be buffered.
  272. // Causes Write to return ErrFull when this limit is reached.
  273. // A zero value means 4MB since v0.11.0.
  274. //
  275. // User can set packetioSizeHardlimit build tag to enable 4MB hardlimit.
  276. // When packetioSizeHardlimit build tag is set, SetLimitSize exceeding
  277. // the hardlimit will be silently discarded.
  278. func (b *Buffer) SetLimitSize(limit int) {
  279. b.mutex.Lock()
  280. defer b.mutex.Unlock()
  281. b.limitSize = limit
  282. }
  283. // SetReadDeadline sets the deadline for the Read operation.
  284. // Setting to zero means no deadline.
  285. func (b *Buffer) SetReadDeadline(t time.Time) error {
  286. b.readDeadline.Set(t)
  287. return nil
  288. }