stream_srtcp.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package srtp
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/pion/rtcp"
  8. "github.com/pion/transport/packetio"
  9. )
  10. // Limit the buffer size to 100KB
  11. const srtcpBufferSize = 100 * 1000
  12. // ReadStreamSRTCP handles decryption for a single RTCP SSRC
  13. type ReadStreamSRTCP struct {
  14. mu sync.Mutex
  15. isInited bool
  16. isClosed chan bool
  17. session *SessionSRTCP
  18. ssrc uint32
  19. buffer io.ReadWriteCloser
  20. }
  21. func (r *ReadStreamSRTCP) write(buf []byte) (n int, err error) {
  22. n, err = r.buffer.Write(buf)
  23. if errors.Is(err, packetio.ErrFull) {
  24. // Silently drop data when the buffer is full.
  25. return len(buf), nil
  26. }
  27. return n, err
  28. }
  29. // Used by getOrCreateReadStream
  30. func newReadStreamSRTCP() readStream {
  31. return &ReadStreamSRTCP{}
  32. }
  33. // ReadRTCP reads and decrypts full RTCP packet and its header from the nextConn
  34. func (r *ReadStreamSRTCP) ReadRTCP(buf []byte) (int, *rtcp.Header, error) {
  35. n, err := r.Read(buf)
  36. if err != nil {
  37. return 0, nil, err
  38. }
  39. header := &rtcp.Header{}
  40. err = header.Unmarshal(buf[:n])
  41. if err != nil {
  42. return 0, nil, err
  43. }
  44. return n, header, nil
  45. }
  46. // Read reads and decrypts full RTCP packet from the nextConn
  47. func (r *ReadStreamSRTCP) Read(buf []byte) (int, error) {
  48. return r.buffer.Read(buf)
  49. }
  50. // SetReadDeadline sets the deadline for the Read operation.
  51. // Setting to zero means no deadline.
  52. func (r *ReadStreamSRTCP) SetReadDeadline(t time.Time) error {
  53. if b, ok := r.buffer.(interface {
  54. SetReadDeadline(time.Time) error
  55. }); ok {
  56. return b.SetReadDeadline(t)
  57. }
  58. return nil
  59. }
  60. // Close removes the ReadStream from the session and cleans up any associated state
  61. func (r *ReadStreamSRTCP) Close() error {
  62. r.mu.Lock()
  63. defer r.mu.Unlock()
  64. if !r.isInited {
  65. return errStreamNotInited
  66. }
  67. select {
  68. case <-r.isClosed:
  69. return errStreamAlreadyClosed
  70. default:
  71. err := r.buffer.Close()
  72. if err != nil {
  73. return err
  74. }
  75. r.session.removeReadStream(r.ssrc)
  76. return nil
  77. }
  78. }
  79. func (r *ReadStreamSRTCP) init(child streamSession, ssrc uint32) error {
  80. sessionSRTCP, ok := child.(*SessionSRTCP)
  81. r.mu.Lock()
  82. defer r.mu.Unlock()
  83. if !ok {
  84. return errFailedTypeAssertion
  85. } else if r.isInited {
  86. return errStreamAlreadyInited
  87. }
  88. r.session = sessionSRTCP
  89. r.ssrc = ssrc
  90. r.isInited = true
  91. r.isClosed = make(chan bool)
  92. if r.session.bufferFactory != nil {
  93. r.buffer = r.session.bufferFactory(packetio.RTCPBufferPacket, ssrc)
  94. } else {
  95. // Create a buffer and limit it to 100KB
  96. buff := packetio.NewBuffer()
  97. buff.SetLimitSize(srtcpBufferSize)
  98. r.buffer = buff
  99. }
  100. return nil
  101. }
  102. // GetSSRC returns the SSRC we are demuxing for
  103. func (r *ReadStreamSRTCP) GetSSRC() uint32 {
  104. return r.ssrc
  105. }
  106. // WriteStreamSRTCP is stream for a single Session that is used to encrypt RTCP
  107. type WriteStreamSRTCP struct {
  108. session *SessionSRTCP
  109. }
  110. // WriteRTCP encrypts a RTCP header and its payload to the nextConn
  111. func (w *WriteStreamSRTCP) WriteRTCP(header *rtcp.Header, payload []byte) (int, error) {
  112. headerRaw, err := header.Marshal()
  113. if err != nil {
  114. return 0, err
  115. }
  116. return w.session.write(append(headerRaw, payload...))
  117. }
  118. // Write encrypts and writes a full RTCP packets to the nextConn
  119. func (w *WriteStreamSRTCP) Write(b []byte) (int, error) {
  120. return w.session.write(b)
  121. }
  122. // SetWriteDeadline sets the deadline for the Write operation.
  123. // Setting to zero means no deadline.
  124. func (w *WriteStreamSRTCP) SetWriteDeadline(t time.Time) error {
  125. return w.session.setWriteDeadline(t)
  126. }