twcc.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. // Package twcc provides interceptors to implement transport wide congestion control.
  2. package twcc
  3. import (
  4. "math"
  5. "github.com/pion/rtcp"
  6. )
  7. type pktInfo struct {
  8. sequenceNumber uint32
  9. arrivalTime int64
  10. }
  11. // Recorder records incoming RTP packets and their delays and creates
  12. // transport wide congestion control feedback reports as specified in
  13. // https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
  14. type Recorder struct {
  15. receivedPackets []pktInfo
  16. cycles uint32
  17. lastSequenceNumber uint16
  18. senderSSRC uint32
  19. mediaSSRC uint32
  20. fbPktCnt uint8
  21. }
  22. // NewRecorder creates a new Recorder which uses the given senderSSRC in the created
  23. // feedback packets.
  24. func NewRecorder(senderSSRC uint32) *Recorder {
  25. return &Recorder{
  26. receivedPackets: []pktInfo{},
  27. senderSSRC: senderSSRC,
  28. }
  29. }
  30. // Record marks a packet with mediaSSRC and a transport wide sequence number sequenceNumber as received at arrivalTime.
  31. func (r *Recorder) Record(mediaSSRC uint32, sequenceNumber uint16, arrivalTime int64) {
  32. r.mediaSSRC = mediaSSRC
  33. if sequenceNumber < 0x0fff && (r.lastSequenceNumber&0xffff) > 0xf000 {
  34. r.cycles += 1 << 16
  35. }
  36. r.receivedPackets = insertSorted(r.receivedPackets, pktInfo{
  37. sequenceNumber: r.cycles | uint32(sequenceNumber),
  38. arrivalTime: arrivalTime,
  39. })
  40. r.lastSequenceNumber = sequenceNumber
  41. }
  42. func insertSorted(list []pktInfo, element pktInfo) []pktInfo {
  43. if len(list) == 0 {
  44. return append(list, element)
  45. }
  46. for i := len(list) - 1; i >= 0; i-- {
  47. if list[i].sequenceNumber < element.sequenceNumber {
  48. list = append(list, pktInfo{})
  49. copy(list[i+2:], list[i+1:])
  50. list[i+1] = element
  51. return list
  52. }
  53. if list[i].sequenceNumber == element.sequenceNumber {
  54. list[i] = element
  55. return list
  56. }
  57. }
  58. // element.sequenceNumber is between 0 and first ever received sequenceNumber
  59. return append([]pktInfo{element}, list...)
  60. }
  61. // BuildFeedbackPacket creates a new RTCP packet containing a TWCC feedback report.
  62. func (r *Recorder) BuildFeedbackPacket() []rtcp.Packet {
  63. feedback := newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
  64. r.fbPktCnt++
  65. if len(r.receivedPackets) < 2 {
  66. r.receivedPackets = []pktInfo{}
  67. return []rtcp.Packet{feedback.getRTCP()}
  68. }
  69. feedback.setBase(uint16(r.receivedPackets[0].sequenceNumber&0xffff), r.receivedPackets[0].arrivalTime)
  70. var pkts []rtcp.Packet
  71. for _, pkt := range r.receivedPackets {
  72. ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
  73. if !ok {
  74. pkts = append(pkts, feedback.getRTCP())
  75. feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
  76. r.fbPktCnt++
  77. feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
  78. }
  79. }
  80. r.receivedPackets = []pktInfo{}
  81. pkts = append(pkts, feedback.getRTCP())
  82. return pkts
  83. }
  84. type feedback struct {
  85. rtcp *rtcp.TransportLayerCC
  86. baseSequenceNumber uint16
  87. refTimestamp64MS int64
  88. lastTimestampUS int64
  89. nextSequenceNumber uint16
  90. sequenceNumberCount uint16
  91. len int
  92. lastChunk chunk
  93. chunks []rtcp.PacketStatusChunk
  94. deltas []*rtcp.RecvDelta
  95. }
  96. func newFeedback(senderSSRC, mediaSSRC uint32, count uint8) *feedback {
  97. return &feedback{
  98. rtcp: &rtcp.TransportLayerCC{
  99. SenderSSRC: senderSSRC,
  100. MediaSSRC: mediaSSRC,
  101. FbPktCount: count,
  102. },
  103. }
  104. }
  105. func (f *feedback) setBase(sequenceNumber uint16, timeUS int64) {
  106. f.baseSequenceNumber = sequenceNumber
  107. f.nextSequenceNumber = f.baseSequenceNumber
  108. f.refTimestamp64MS = timeUS / 64e3
  109. f.lastTimestampUS = f.refTimestamp64MS * 64e3
  110. }
  111. func (f *feedback) getRTCP() *rtcp.TransportLayerCC {
  112. f.rtcp.PacketStatusCount = f.sequenceNumberCount
  113. f.rtcp.ReferenceTime = uint32(f.refTimestamp64MS)
  114. f.rtcp.BaseSequenceNumber = f.baseSequenceNumber
  115. for len(f.lastChunk.deltas) > 0 {
  116. f.chunks = append(f.chunks, f.lastChunk.encode())
  117. }
  118. f.rtcp.PacketChunks = append(f.rtcp.PacketChunks, f.chunks...)
  119. f.rtcp.RecvDeltas = f.deltas
  120. padLen := 20 + len(f.rtcp.PacketChunks)*2 + f.len // 4 bytes header + 16 bytes twcc header + 2 bytes for each chunk + length of deltas
  121. padding := padLen%4 != 0
  122. for padLen%4 != 0 {
  123. padLen++
  124. }
  125. f.rtcp.Header = rtcp.Header{
  126. Count: rtcp.FormatTCC,
  127. Type: rtcp.TypeTransportSpecificFeedback,
  128. Padding: padding,
  129. Length: uint16((padLen / 4) - 1),
  130. }
  131. return f.rtcp
  132. }
  133. func (f *feedback) addReceived(sequenceNumber uint16, timestampUS int64) bool {
  134. deltaUS := timestampUS - f.lastTimestampUS
  135. delta250US := deltaUS / 250
  136. if delta250US < math.MinInt16 || delta250US > math.MaxInt16 { // delta doesn't fit into 16 bit, need to create new packet
  137. return false
  138. }
  139. for ; f.nextSequenceNumber != sequenceNumber; f.nextSequenceNumber++ {
  140. if !f.lastChunk.canAdd(rtcp.TypeTCCPacketNotReceived) {
  141. f.chunks = append(f.chunks, f.lastChunk.encode())
  142. }
  143. f.lastChunk.add(rtcp.TypeTCCPacketNotReceived)
  144. f.sequenceNumberCount++
  145. }
  146. var recvDelta uint16
  147. switch {
  148. case delta250US >= 0 && delta250US <= 0xff:
  149. f.len++
  150. recvDelta = rtcp.TypeTCCPacketReceivedSmallDelta
  151. default:
  152. f.len += 2
  153. recvDelta = rtcp.TypeTCCPacketReceivedLargeDelta
  154. }
  155. if !f.lastChunk.canAdd(recvDelta) {
  156. f.chunks = append(f.chunks, f.lastChunk.encode())
  157. }
  158. f.lastChunk.add(recvDelta)
  159. f.deltas = append(f.deltas, &rtcp.RecvDelta{
  160. Type: recvDelta,
  161. Delta: deltaUS,
  162. })
  163. f.lastTimestampUS = timestampUS
  164. f.sequenceNumberCount++
  165. f.nextSequenceNumber++
  166. return true
  167. }
  168. const (
  169. maxRunLengthCap = 0x1fff // 13 bits
  170. maxOneBitCap = 14 // bits
  171. maxTwoBitCap = 7 // bits
  172. )
  173. type chunk struct {
  174. hasLargeDelta bool
  175. hasDifferentTypes bool
  176. deltas []uint16
  177. }
  178. func (c *chunk) canAdd(delta uint16) bool {
  179. if len(c.deltas) < maxTwoBitCap {
  180. return true
  181. }
  182. if len(c.deltas) < maxOneBitCap && !c.hasLargeDelta && delta != rtcp.TypeTCCPacketReceivedLargeDelta {
  183. return true
  184. }
  185. if len(c.deltas) < maxRunLengthCap && !c.hasDifferentTypes && delta == c.deltas[0] {
  186. return true
  187. }
  188. return false
  189. }
  190. func (c *chunk) add(delta uint16) {
  191. c.deltas = append(c.deltas, delta)
  192. c.hasLargeDelta = c.hasLargeDelta || delta == rtcp.TypeTCCPacketReceivedLargeDelta
  193. c.hasDifferentTypes = c.hasDifferentTypes || delta != c.deltas[0]
  194. }
  195. func (c *chunk) encode() rtcp.PacketStatusChunk {
  196. if !c.hasDifferentTypes {
  197. defer c.reset()
  198. return &rtcp.RunLengthChunk{
  199. PacketStatusSymbol: c.deltas[0],
  200. RunLength: uint16(len(c.deltas)),
  201. }
  202. }
  203. if len(c.deltas) == maxOneBitCap {
  204. defer c.reset()
  205. return &rtcp.StatusVectorChunk{
  206. SymbolSize: rtcp.TypeTCCSymbolSizeOneBit,
  207. SymbolList: c.deltas,
  208. }
  209. }
  210. minCap := min(maxTwoBitCap, len(c.deltas))
  211. svc := &rtcp.StatusVectorChunk{
  212. SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit,
  213. SymbolList: c.deltas[:minCap],
  214. }
  215. c.deltas = c.deltas[minCap:]
  216. c.hasDifferentTypes = false
  217. c.hasLargeDelta = false
  218. if len(c.deltas) > 0 {
  219. tmp := c.deltas[0]
  220. for _, d := range c.deltas {
  221. if tmp != d {
  222. c.hasDifferentTypes = true
  223. }
  224. if d == rtcp.TypeTCCPacketReceivedLargeDelta {
  225. c.hasLargeDelta = true
  226. }
  227. }
  228. }
  229. return svc
  230. }
  231. func (c *chunk) reset() {
  232. c.deltas = []uint16{}
  233. c.hasLargeDelta = false
  234. c.hasDifferentTypes = false
  235. }
  236. func min(a, b int) int {
  237. if a < b {
  238. return a
  239. }
  240. return b
  241. }