srtp_writer_future.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. //go:build !js
  2. // +build !js
  3. package webrtc
  4. import (
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/pion/rtp"
  10. "github.com/pion/srtp/v2"
  11. )
  12. // srtpWriterFuture blocks Read/Write calls until
  13. // the SRTP Session is available
  14. type srtpWriterFuture struct {
  15. ssrc SSRC
  16. rtpSender *RTPSender
  17. rtcpReadStream atomic.Value // *srtp.ReadStreamSRTCP
  18. rtpWriteStream atomic.Value // *srtp.WriteStreamSRTP
  19. mu sync.Mutex
  20. closed bool
  21. }
  22. func (s *srtpWriterFuture) init(returnWhenNoSRTP bool) error {
  23. if returnWhenNoSRTP {
  24. select {
  25. case <-s.rtpSender.stopCalled:
  26. return io.ErrClosedPipe
  27. case <-s.rtpSender.transport.srtpReady:
  28. default:
  29. return nil
  30. }
  31. } else {
  32. select {
  33. case <-s.rtpSender.stopCalled:
  34. return io.ErrClosedPipe
  35. case <-s.rtpSender.transport.srtpReady:
  36. }
  37. }
  38. s.mu.Lock()
  39. defer s.mu.Unlock()
  40. if s.closed {
  41. return io.ErrClosedPipe
  42. }
  43. srtcpSession, err := s.rtpSender.transport.getSRTCPSession()
  44. if err != nil {
  45. return err
  46. }
  47. rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(s.ssrc))
  48. if err != nil {
  49. return err
  50. }
  51. srtpSession, err := s.rtpSender.transport.getSRTPSession()
  52. if err != nil {
  53. return err
  54. }
  55. rtpWriteStream, err := srtpSession.OpenWriteStream()
  56. if err != nil {
  57. return err
  58. }
  59. s.rtcpReadStream.Store(rtcpReadStream)
  60. s.rtpWriteStream.Store(rtpWriteStream)
  61. return nil
  62. }
  63. func (s *srtpWriterFuture) Close() error {
  64. s.mu.Lock()
  65. defer s.mu.Unlock()
  66. if s.closed {
  67. return nil
  68. }
  69. s.closed = true
  70. if value, ok := s.rtcpReadStream.Load().(*srtp.ReadStreamSRTCP); ok {
  71. return value.Close()
  72. }
  73. return nil
  74. }
  75. func (s *srtpWriterFuture) Read(b []byte) (n int, err error) {
  76. if value, ok := s.rtcpReadStream.Load().(*srtp.ReadStreamSRTCP); ok {
  77. return value.Read(b)
  78. }
  79. if err := s.init(false); err != nil || s.rtcpReadStream.Load() == nil {
  80. return 0, err
  81. }
  82. return s.Read(b)
  83. }
  84. func (s *srtpWriterFuture) SetReadDeadline(t time.Time) error {
  85. if value, ok := s.rtcpReadStream.Load().(*srtp.ReadStreamSRTCP); ok {
  86. return value.SetReadDeadline(t)
  87. }
  88. if err := s.init(false); err != nil || s.rtcpReadStream.Load() == nil {
  89. return err
  90. }
  91. return s.SetReadDeadline(t)
  92. }
  93. func (s *srtpWriterFuture) WriteRTP(header *rtp.Header, payload []byte) (int, error) {
  94. if value, ok := s.rtpWriteStream.Load().(*srtp.WriteStreamSRTP); ok {
  95. return value.WriteRTP(header, payload)
  96. }
  97. if err := s.init(true); err != nil || s.rtpWriteStream.Load() == nil {
  98. return 0, err
  99. }
  100. return s.WriteRTP(header, payload)
  101. }
  102. func (s *srtpWriterFuture) Write(b []byte) (int, error) {
  103. if value, ok := s.rtpWriteStream.Load().(*srtp.WriteStreamSRTP); ok {
  104. return value.Write(b)
  105. }
  106. if err := s.init(true); err != nil || s.rtpWriteStream.Load() == nil {
  107. return 0, err
  108. }
  109. return s.Write(b)
  110. }