rtpsender.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. //go:build !js
  2. // +build !js
  3. package webrtc
  4. import (
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/pion/interceptor"
  10. "github.com/pion/randutil"
  11. "github.com/pion/rtcp"
  12. "github.com/pion/rtp"
  13. "github.com/pion/webrtc/v3/internal/util"
  14. )
  15. type trackEncoding struct {
  16. track TrackLocal
  17. srtpStream *srtpWriterFuture
  18. rtcpInterceptor interceptor.RTCPReader
  19. streamInfo interceptor.StreamInfo
  20. context TrackLocalContext
  21. ssrc SSRC
  22. }
  23. // RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
  24. type RTPSender struct {
  25. trackEncodings []*trackEncoding
  26. transport *DTLSTransport
  27. payloadType PayloadType
  28. kind RTPCodecType
  29. // nolint:godox
  30. // TODO(sgotti) remove this when in future we'll avoid replacing
  31. // a transceiver sender since we can just check the
  32. // transceiver negotiation status
  33. negotiated bool
  34. // A reference to the associated api object
  35. api *API
  36. id string
  37. rtpTransceiver *RTPTransceiver
  38. mu sync.RWMutex
  39. sendCalled, stopCalled chan struct{}
  40. }
  41. // NewRTPSender constructs a new RTPSender
  42. func (api *API) NewRTPSender(track TrackLocal, transport *DTLSTransport) (*RTPSender, error) {
  43. if track == nil {
  44. return nil, errRTPSenderTrackNil
  45. } else if transport == nil {
  46. return nil, errRTPSenderDTLSTransportNil
  47. }
  48. id, err := randutil.GenerateCryptoRandomString(32, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
  49. if err != nil {
  50. return nil, err
  51. }
  52. r := &RTPSender{
  53. transport: transport,
  54. api: api,
  55. sendCalled: make(chan struct{}),
  56. stopCalled: make(chan struct{}),
  57. id: id,
  58. kind: track.Kind(),
  59. }
  60. r.addEncoding(track)
  61. return r, nil
  62. }
  63. func (r *RTPSender) isNegotiated() bool {
  64. r.mu.RLock()
  65. defer r.mu.RUnlock()
  66. return r.negotiated
  67. }
  68. func (r *RTPSender) setNegotiated() {
  69. r.mu.Lock()
  70. defer r.mu.Unlock()
  71. r.negotiated = true
  72. }
  73. func (r *RTPSender) setRTPTransceiver(rtpTransceiver *RTPTransceiver) {
  74. r.mu.Lock()
  75. defer r.mu.Unlock()
  76. r.rtpTransceiver = rtpTransceiver
  77. }
  78. // Transport returns the currently-configured *DTLSTransport or nil
  79. // if one has not yet been configured
  80. func (r *RTPSender) Transport() *DTLSTransport {
  81. r.mu.RLock()
  82. defer r.mu.RUnlock()
  83. return r.transport
  84. }
  85. func (r *RTPSender) getParameters() RTPSendParameters {
  86. var encodings []RTPEncodingParameters
  87. for _, trackEncoding := range r.trackEncodings {
  88. var rid string
  89. if trackEncoding.track != nil {
  90. rid = trackEncoding.track.RID()
  91. }
  92. encodings = append(encodings, RTPEncodingParameters{
  93. RTPCodingParameters: RTPCodingParameters{
  94. RID: rid,
  95. SSRC: trackEncoding.ssrc,
  96. PayloadType: r.payloadType,
  97. },
  98. })
  99. }
  100. sendParameters := RTPSendParameters{
  101. RTPParameters: r.api.mediaEngine.getRTPParametersByKind(
  102. r.kind,
  103. []RTPTransceiverDirection{RTPTransceiverDirectionSendonly},
  104. ),
  105. Encodings: encodings,
  106. }
  107. if r.rtpTransceiver != nil {
  108. sendParameters.Codecs = r.rtpTransceiver.getCodecs()
  109. } else {
  110. sendParameters.Codecs = r.api.mediaEngine.getCodecsByKind(r.kind)
  111. }
  112. return sendParameters
  113. }
  114. // GetParameters describes the current configuration for the encoding and
  115. // transmission of media on the sender's track.
  116. func (r *RTPSender) GetParameters() RTPSendParameters {
  117. r.mu.RLock()
  118. defer r.mu.RUnlock()
  119. return r.getParameters()
  120. }
  121. // AddEncoding adds an encoding to RTPSender. Used by simulcast senders.
  122. func (r *RTPSender) AddEncoding(track TrackLocal) error {
  123. r.mu.Lock()
  124. defer r.mu.Unlock()
  125. if track == nil {
  126. return errRTPSenderTrackNil
  127. }
  128. if track.RID() == "" {
  129. return errRTPSenderRidNil
  130. }
  131. if r.hasStopped() {
  132. return errRTPSenderStopped
  133. }
  134. if r.hasSent() {
  135. return errRTPSenderSendAlreadyCalled
  136. }
  137. var refTrack TrackLocal
  138. if len(r.trackEncodings) != 0 {
  139. refTrack = r.trackEncodings[0].track
  140. }
  141. if refTrack == nil || refTrack.RID() == "" {
  142. return errRTPSenderNoBaseEncoding
  143. }
  144. if refTrack.ID() != track.ID() || refTrack.StreamID() != track.StreamID() || refTrack.Kind() != track.Kind() {
  145. return errRTPSenderBaseEncodingMismatch
  146. }
  147. for _, encoding := range r.trackEncodings {
  148. if encoding.track == nil {
  149. continue
  150. }
  151. if encoding.track.RID() == track.RID() {
  152. return errRTPSenderRIDCollision
  153. }
  154. }
  155. r.addEncoding(track)
  156. return nil
  157. }
  158. func (r *RTPSender) addEncoding(track TrackLocal) {
  159. ssrc := SSRC(randutil.NewMathRandomGenerator().Uint32())
  160. trackEncoding := &trackEncoding{
  161. track: track,
  162. srtpStream: &srtpWriterFuture{ssrc: ssrc},
  163. ssrc: ssrc,
  164. }
  165. trackEncoding.srtpStream.rtpSender = r
  166. trackEncoding.rtcpInterceptor = r.api.interceptor.BindRTCPReader(
  167. interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
  168. n, err = trackEncoding.srtpStream.Read(in)
  169. return n, a, err
  170. }),
  171. )
  172. r.trackEncodings = append(r.trackEncodings, trackEncoding)
  173. }
  174. // Track returns the RTCRtpTransceiver track, or nil
  175. func (r *RTPSender) Track() TrackLocal {
  176. r.mu.RLock()
  177. defer r.mu.RUnlock()
  178. if len(r.trackEncodings) == 0 {
  179. return nil
  180. }
  181. return r.trackEncodings[0].track
  182. }
  183. // ReplaceTrack replaces the track currently being used as the sender's source with a new TrackLocal.
  184. // The new track must be of the same media kind (audio, video, etc) and switching the track should not
  185. // require negotiation.
  186. func (r *RTPSender) ReplaceTrack(track TrackLocal) error {
  187. r.mu.Lock()
  188. defer r.mu.Unlock()
  189. if track != nil && r.kind != track.Kind() {
  190. return ErrRTPSenderNewTrackHasIncorrectKind
  191. }
  192. // cannot replace simulcast envelope
  193. if track != nil && len(r.trackEncodings) > 1 {
  194. return ErrRTPSenderNewTrackHasIncorrectEnvelope
  195. }
  196. var replacedTrack TrackLocal
  197. var context *TrackLocalContext
  198. if len(r.trackEncodings) != 0 {
  199. replacedTrack = r.trackEncodings[0].track
  200. context = &r.trackEncodings[0].context
  201. }
  202. if r.hasSent() && replacedTrack != nil {
  203. if err := replacedTrack.Unbind(*context); err != nil {
  204. return err
  205. }
  206. }
  207. if !r.hasSent() || track == nil {
  208. r.trackEncodings[0].track = track
  209. return nil
  210. }
  211. codec, err := track.Bind(TrackLocalContext{
  212. id: context.id,
  213. params: r.api.mediaEngine.getRTPParametersByKind(track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
  214. ssrc: context.ssrc,
  215. writeStream: context.writeStream,
  216. rtcpInterceptor: context.rtcpInterceptor,
  217. })
  218. if err != nil {
  219. // Re-bind the original track
  220. if _, reBindErr := replacedTrack.Bind(*context); reBindErr != nil {
  221. return reBindErr
  222. }
  223. return err
  224. }
  225. // Codec has changed
  226. if r.payloadType != codec.PayloadType {
  227. context.params.Codecs = []RTPCodecParameters{codec}
  228. }
  229. r.trackEncodings[0].track = track
  230. return nil
  231. }
  232. // Send Attempts to set the parameters controlling the sending of media.
  233. func (r *RTPSender) Send(parameters RTPSendParameters) error {
  234. r.mu.Lock()
  235. defer r.mu.Unlock()
  236. switch {
  237. case r.hasSent():
  238. return errRTPSenderSendAlreadyCalled
  239. case r.trackEncodings[0].track == nil:
  240. return errRTPSenderTrackRemoved
  241. }
  242. for idx, trackEncoding := range r.trackEncodings {
  243. writeStream := &interceptorToTrackLocalWriter{}
  244. trackEncoding.context = TrackLocalContext{
  245. id: r.id,
  246. params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
  247. ssrc: parameters.Encodings[idx].SSRC,
  248. writeStream: writeStream,
  249. rtcpInterceptor: trackEncoding.rtcpInterceptor,
  250. }
  251. codec, err := trackEncoding.track.Bind(trackEncoding.context)
  252. if err != nil {
  253. return err
  254. }
  255. trackEncoding.context.params.Codecs = []RTPCodecParameters{codec}
  256. trackEncoding.streamInfo = *createStreamInfo(
  257. r.id,
  258. parameters.Encodings[idx].SSRC,
  259. codec.PayloadType,
  260. codec.RTPCodecCapability,
  261. parameters.HeaderExtensions,
  262. )
  263. srtpStream := trackEncoding.srtpStream
  264. rtpInterceptor := r.api.interceptor.BindLocalStream(
  265. &trackEncoding.streamInfo,
  266. interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
  267. return srtpStream.WriteRTP(header, payload)
  268. }),
  269. )
  270. writeStream.interceptor.Store(rtpInterceptor)
  271. }
  272. close(r.sendCalled)
  273. return nil
  274. }
  275. // Stop irreversibly stops the RTPSender
  276. func (r *RTPSender) Stop() error {
  277. r.mu.Lock()
  278. if stopped := r.hasStopped(); stopped {
  279. r.mu.Unlock()
  280. return nil
  281. }
  282. close(r.stopCalled)
  283. r.mu.Unlock()
  284. if !r.hasSent() {
  285. return nil
  286. }
  287. if err := r.ReplaceTrack(nil); err != nil {
  288. return err
  289. }
  290. errs := []error{}
  291. for _, trackEncoding := range r.trackEncodings {
  292. r.api.interceptor.UnbindLocalStream(&trackEncoding.streamInfo)
  293. errs = append(errs, trackEncoding.srtpStream.Close())
  294. }
  295. return util.FlattenErrs(errs)
  296. }
  297. // Read reads incoming RTCP for this RTPSender
  298. func (r *RTPSender) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  299. select {
  300. case <-r.sendCalled:
  301. return r.trackEncodings[0].rtcpInterceptor.Read(b, a)
  302. case <-r.stopCalled:
  303. return 0, nil, io.ErrClosedPipe
  304. }
  305. }
  306. // ReadRTCP is a convenience method that wraps Read and unmarshals for you.
  307. func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  308. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  309. i, attributes, err := r.Read(b)
  310. if err != nil {
  311. return nil, nil, err
  312. }
  313. pkts, err := rtcp.Unmarshal(b[:i])
  314. if err != nil {
  315. return nil, nil, err
  316. }
  317. return pkts, attributes, nil
  318. }
  319. // ReadSimulcast reads incoming RTCP for this RTPSender for given rid
  320. func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  321. select {
  322. case <-r.sendCalled:
  323. for _, t := range r.trackEncodings {
  324. if t.track != nil && t.track.RID() == rid {
  325. return t.rtcpInterceptor.Read(b, a)
  326. }
  327. }
  328. return 0, nil, fmt.Errorf("%w: %s", errRTPSenderNoTrackForRID, rid)
  329. case <-r.stopCalled:
  330. return 0, nil, io.ErrClosedPipe
  331. }
  332. }
  333. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  334. func (r *RTPSender) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  335. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  336. i, attributes, err := r.ReadSimulcast(b, rid)
  337. if err != nil {
  338. return nil, nil, err
  339. }
  340. pkts, err := rtcp.Unmarshal(b[:i])
  341. return pkts, attributes, err
  342. }
  343. // SetReadDeadline sets the deadline for the Read operation.
  344. // Setting to zero means no deadline.
  345. func (r *RTPSender) SetReadDeadline(t time.Time) error {
  346. return r.trackEncodings[0].srtpStream.SetReadDeadline(t)
  347. }
  348. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  349. func (r *RTPSender) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  350. r.mu.RLock()
  351. defer r.mu.RUnlock()
  352. for _, t := range r.trackEncodings {
  353. if t.track != nil && t.track.RID() == rid {
  354. return t.srtpStream.SetReadDeadline(deadline)
  355. }
  356. }
  357. return fmt.Errorf("%w: %s", errRTPSenderNoTrackForRID, rid)
  358. }
  359. // hasSent tells if data has been ever sent for this instance
  360. func (r *RTPSender) hasSent() bool {
  361. select {
  362. case <-r.sendCalled:
  363. return true
  364. default:
  365. return false
  366. }
  367. }
  368. // hasStopped tells if stop has been called
  369. func (r *RTPSender) hasStopped() bool {
  370. select {
  371. case <-r.stopCalled:
  372. return true
  373. default:
  374. return false
  375. }
  376. }