stream_srtp.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package srtp
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/pion/rtp"
  8. "github.com/pion/transport/packetio"
  9. )
  10. // Limit the buffer size to 1MB
  11. const srtpBufferSize = 1000 * 1000
  12. // ReadStreamSRTP handles decryption for a single RTP SSRC
  13. type ReadStreamSRTP struct {
  14. mu sync.Mutex
  15. isInited bool
  16. isClosed chan bool
  17. session *SessionSRTP
  18. ssrc uint32
  19. buffer io.ReadWriteCloser
  20. }
  21. // Used by getOrCreateReadStream
  22. func newReadStreamSRTP() readStream {
  23. return &ReadStreamSRTP{}
  24. }
  25. func (r *ReadStreamSRTP) init(child streamSession, ssrc uint32) error {
  26. sessionSRTP, ok := child.(*SessionSRTP)
  27. r.mu.Lock()
  28. defer r.mu.Unlock()
  29. if !ok {
  30. return errFailedTypeAssertion
  31. } else if r.isInited {
  32. return errStreamAlreadyInited
  33. }
  34. r.session = sessionSRTP
  35. r.ssrc = ssrc
  36. r.isInited = true
  37. r.isClosed = make(chan bool)
  38. // Create a buffer with a 1MB limit
  39. if r.session.bufferFactory != nil {
  40. r.buffer = r.session.bufferFactory(packetio.RTPBufferPacket, ssrc)
  41. } else {
  42. buff := packetio.NewBuffer()
  43. buff.SetLimitSize(srtpBufferSize)
  44. r.buffer = buff
  45. }
  46. return nil
  47. }
  48. func (r *ReadStreamSRTP) write(buf []byte) (n int, err error) {
  49. n, err = r.buffer.Write(buf)
  50. if errors.Is(err, packetio.ErrFull) {
  51. // Silently drop data when the buffer is full.
  52. return len(buf), nil
  53. }
  54. return n, err
  55. }
  56. // Read reads and decrypts full RTP packet from the nextConn
  57. func (r *ReadStreamSRTP) Read(buf []byte) (int, error) {
  58. return r.buffer.Read(buf)
  59. }
  60. // ReadRTP reads and decrypts full RTP packet and its header from the nextConn
  61. func (r *ReadStreamSRTP) ReadRTP(buf []byte) (int, *rtp.Header, error) {
  62. n, err := r.Read(buf)
  63. if err != nil {
  64. return 0, nil, err
  65. }
  66. header := &rtp.Header{}
  67. _, err = header.Unmarshal(buf[:n])
  68. if err != nil {
  69. return 0, nil, err
  70. }
  71. return n, header, nil
  72. }
  73. // SetReadDeadline sets the deadline for the Read operation.
  74. // Setting to zero means no deadline.
  75. func (r *ReadStreamSRTP) SetReadDeadline(t time.Time) error {
  76. if b, ok := r.buffer.(interface {
  77. SetReadDeadline(time.Time) error
  78. }); ok {
  79. return b.SetReadDeadline(t)
  80. }
  81. return nil
  82. }
  83. // Close removes the ReadStream from the session and cleans up any associated state
  84. func (r *ReadStreamSRTP) Close() error {
  85. r.mu.Lock()
  86. defer r.mu.Unlock()
  87. if !r.isInited {
  88. return errStreamNotInited
  89. }
  90. select {
  91. case <-r.isClosed:
  92. return errStreamAlreadyClosed
  93. default:
  94. err := r.buffer.Close()
  95. if err != nil {
  96. return err
  97. }
  98. r.session.removeReadStream(r.ssrc)
  99. return nil
  100. }
  101. }
  102. // GetSSRC returns the SSRC we are demuxing for
  103. func (r *ReadStreamSRTP) GetSSRC() uint32 {
  104. return r.ssrc
  105. }
  106. // WriteStreamSRTP is stream for a single Session that is used to encrypt RTP
  107. type WriteStreamSRTP struct {
  108. session *SessionSRTP
  109. }
  110. // WriteRTP encrypts a RTP packet and writes to the connection
  111. func (w *WriteStreamSRTP) WriteRTP(header *rtp.Header, payload []byte) (int, error) {
  112. return w.session.writeRTP(header, payload)
  113. }
  114. // Write encrypts and writes a full RTP packets to the nextConn
  115. func (w *WriteStreamSRTP) Write(b []byte) (int, error) {
  116. return w.session.write(b)
  117. }
  118. // SetWriteDeadline sets the deadline for the Write operation.
  119. // Setting to zero means no deadline.
  120. func (w *WriteStreamSRTP) SetWriteDeadline(t time.Time) error {
  121. return w.session.setWriteDeadline(t)
  122. }