datachannel.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. //go:build !js
  2. // +build !js
  3. package webrtc
  4. import (
  5. "errors"
  6. "fmt"
  7. "io"
  8. "math"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/pion/datachannel"
  13. "github.com/pion/logging"
  14. "github.com/pion/webrtc/v3/pkg/rtcerr"
  15. )
  16. const dataChannelBufferSize = math.MaxUint16 // message size limit for Chromium
  17. var errSCTPNotEstablished = errors.New("SCTP not established")
  18. // DataChannel represents a WebRTC DataChannel
  19. // The DataChannel interface represents a network channel
  20. // which can be used for bidirectional peer-to-peer transfers of arbitrary data
  21. type DataChannel struct {
  22. mu sync.RWMutex
  23. statsID string
  24. label string
  25. ordered bool
  26. maxPacketLifeTime *uint16
  27. maxRetransmits *uint16
  28. protocol string
  29. negotiated bool
  30. id *uint16
  31. readyState atomic.Value // DataChannelState
  32. bufferedAmountLowThreshold uint64
  33. detachCalled bool
  34. // The binaryType represents attribute MUST, on getting, return the value to
  35. // which it was last set. On setting, if the new value is either the string
  36. // "blob" or the string "arraybuffer", then set the IDL attribute to this
  37. // new value. Otherwise, throw a SyntaxError. When an DataChannel object
  38. // is created, the binaryType attribute MUST be initialized to the string
  39. // "blob". This attribute controls how binary data is exposed to scripts.
  40. // binaryType string
  41. onMessageHandler func(DataChannelMessage)
  42. openHandlerOnce sync.Once
  43. onOpenHandler func()
  44. onCloseHandler func()
  45. onBufferedAmountLow func()
  46. onErrorHandler func(error)
  47. sctpTransport *SCTPTransport
  48. dataChannel *datachannel.DataChannel
  49. // A reference to the associated api object used by this datachannel
  50. api *API
  51. log logging.LeveledLogger
  52. }
  53. // NewDataChannel creates a new DataChannel.
  54. // This constructor is part of the ORTC API. It is not
  55. // meant to be used together with the basic WebRTC API.
  56. func (api *API) NewDataChannel(transport *SCTPTransport, params *DataChannelParameters) (*DataChannel, error) {
  57. d, err := api.newDataChannel(params, api.settingEngine.LoggerFactory.NewLogger("ortc"))
  58. if err != nil {
  59. return nil, err
  60. }
  61. err = d.open(transport)
  62. if err != nil {
  63. return nil, err
  64. }
  65. return d, nil
  66. }
  67. // newDataChannel is an internal constructor for the data channel used to
  68. // create the DataChannel object before the networking is set up.
  69. func (api *API) newDataChannel(params *DataChannelParameters, log logging.LeveledLogger) (*DataChannel, error) {
  70. // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #5)
  71. if len(params.Label) > 65535 {
  72. return nil, &rtcerr.TypeError{Err: ErrStringSizeLimit}
  73. }
  74. d := &DataChannel{
  75. statsID: fmt.Sprintf("DataChannel-%d", time.Now().UnixNano()),
  76. label: params.Label,
  77. protocol: params.Protocol,
  78. negotiated: params.Negotiated,
  79. id: params.ID,
  80. ordered: params.Ordered,
  81. maxPacketLifeTime: params.MaxPacketLifeTime,
  82. maxRetransmits: params.MaxRetransmits,
  83. api: api,
  84. log: log,
  85. }
  86. d.setReadyState(DataChannelStateConnecting)
  87. return d, nil
  88. }
  89. // open opens the datachannel over the sctp transport
  90. func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
  91. association := sctpTransport.association()
  92. if association == nil {
  93. return errSCTPNotEstablished
  94. }
  95. d.mu.Lock()
  96. if d.sctpTransport != nil { // already open
  97. d.mu.Unlock()
  98. return nil
  99. }
  100. d.sctpTransport = sctpTransport
  101. var channelType datachannel.ChannelType
  102. var reliabilityParameter uint32
  103. switch {
  104. case d.maxPacketLifeTime == nil && d.maxRetransmits == nil:
  105. if d.ordered {
  106. channelType = datachannel.ChannelTypeReliable
  107. } else {
  108. channelType = datachannel.ChannelTypeReliableUnordered
  109. }
  110. case d.maxRetransmits != nil:
  111. reliabilityParameter = uint32(*d.maxRetransmits)
  112. if d.ordered {
  113. channelType = datachannel.ChannelTypePartialReliableRexmit
  114. } else {
  115. channelType = datachannel.ChannelTypePartialReliableRexmitUnordered
  116. }
  117. default:
  118. reliabilityParameter = uint32(*d.maxPacketLifeTime)
  119. if d.ordered {
  120. channelType = datachannel.ChannelTypePartialReliableTimed
  121. } else {
  122. channelType = datachannel.ChannelTypePartialReliableTimedUnordered
  123. }
  124. }
  125. cfg := &datachannel.Config{
  126. ChannelType: channelType,
  127. Priority: datachannel.ChannelPriorityNormal,
  128. ReliabilityParameter: reliabilityParameter,
  129. Label: d.label,
  130. Protocol: d.protocol,
  131. Negotiated: d.negotiated,
  132. LoggerFactory: d.api.settingEngine.LoggerFactory,
  133. }
  134. if d.id == nil {
  135. // avoid holding lock when generating ID, since id generation locks
  136. d.mu.Unlock()
  137. var dcID *uint16
  138. err := d.sctpTransport.generateAndSetDataChannelID(d.sctpTransport.dtlsTransport.role(), &dcID)
  139. if err != nil {
  140. return err
  141. }
  142. d.mu.Lock()
  143. d.id = dcID
  144. }
  145. dc, err := datachannel.Dial(association, *d.id, cfg)
  146. if err != nil {
  147. d.mu.Unlock()
  148. return err
  149. }
  150. // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
  151. dc.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
  152. dc.OnBufferedAmountLow(d.onBufferedAmountLow)
  153. d.mu.Unlock()
  154. d.handleOpen(dc, false, d.negotiated)
  155. return nil
  156. }
  157. // Transport returns the SCTPTransport instance the DataChannel is sending over.
  158. func (d *DataChannel) Transport() *SCTPTransport {
  159. d.mu.RLock()
  160. defer d.mu.RUnlock()
  161. return d.sctpTransport
  162. }
  163. // After onOpen is complete check that the user called detach
  164. // and provide an error message if the call was missed
  165. func (d *DataChannel) checkDetachAfterOpen() {
  166. d.mu.RLock()
  167. defer d.mu.RUnlock()
  168. if d.api.settingEngine.detach.DataChannels && !d.detachCalled {
  169. d.log.Warn("webrtc.DetachDataChannels() enabled but didn't Detach, call Detach from OnOpen")
  170. }
  171. }
  172. // OnOpen sets an event handler which is invoked when
  173. // the underlying data transport has been established (or re-established).
  174. func (d *DataChannel) OnOpen(f func()) {
  175. d.mu.Lock()
  176. d.openHandlerOnce = sync.Once{}
  177. d.onOpenHandler = f
  178. d.mu.Unlock()
  179. if d.ReadyState() == DataChannelStateOpen {
  180. // If the data channel is already open, call the handler immediately.
  181. go d.openHandlerOnce.Do(func() {
  182. f()
  183. d.checkDetachAfterOpen()
  184. })
  185. }
  186. }
  187. func (d *DataChannel) onOpen() {
  188. d.mu.RLock()
  189. handler := d.onOpenHandler
  190. d.mu.RUnlock()
  191. if handler != nil {
  192. go d.openHandlerOnce.Do(func() {
  193. handler()
  194. d.checkDetachAfterOpen()
  195. })
  196. }
  197. }
  198. // OnClose sets an event handler which is invoked when
  199. // the underlying data transport has been closed.
  200. func (d *DataChannel) OnClose(f func()) {
  201. d.mu.Lock()
  202. defer d.mu.Unlock()
  203. d.onCloseHandler = f
  204. }
  205. func (d *DataChannel) onClose() {
  206. d.mu.RLock()
  207. handler := d.onCloseHandler
  208. d.mu.RUnlock()
  209. if handler != nil {
  210. go handler()
  211. }
  212. }
  213. // OnMessage sets an event handler which is invoked on a binary
  214. // message arrival over the sctp transport from a remote peer.
  215. // OnMessage can currently receive messages up to 16384 bytes
  216. // in size. Check out the detach API if you want to use larger
  217. // message sizes. Note that browser support for larger messages
  218. // is also limited.
  219. func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
  220. d.mu.Lock()
  221. defer d.mu.Unlock()
  222. d.onMessageHandler = f
  223. }
  224. func (d *DataChannel) onMessage(msg DataChannelMessage) {
  225. d.mu.RLock()
  226. handler := d.onMessageHandler
  227. d.mu.RUnlock()
  228. if handler == nil {
  229. return
  230. }
  231. handler(msg)
  232. }
  233. func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
  234. d.mu.Lock()
  235. d.dataChannel = dc
  236. d.mu.Unlock()
  237. d.setReadyState(DataChannelStateOpen)
  238. // Fire the OnOpen handler immediately not using pion/datachannel
  239. // * detached datachannels have no read loop, the user needs to read and query themselves
  240. // * remote datachannels should fire OnOpened. This isn't spec compliant, but we can't break behavior yet
  241. // * already negotiated datachannels should fire OnOpened
  242. if d.api.settingEngine.detach.DataChannels || isRemote || isAlreadyNegotiated {
  243. d.onOpen()
  244. } else {
  245. dc.OnOpen(func() {
  246. d.onOpen()
  247. })
  248. }
  249. d.mu.Lock()
  250. defer d.mu.Unlock()
  251. if !d.api.settingEngine.detach.DataChannels {
  252. go d.readLoop()
  253. }
  254. }
  255. // OnError sets an event handler which is invoked when
  256. // the underlying data transport cannot be read.
  257. func (d *DataChannel) OnError(f func(err error)) {
  258. d.mu.Lock()
  259. defer d.mu.Unlock()
  260. d.onErrorHandler = f
  261. }
  262. func (d *DataChannel) onError(err error) {
  263. d.mu.RLock()
  264. handler := d.onErrorHandler
  265. d.mu.RUnlock()
  266. if handler != nil {
  267. go handler(err)
  268. }
  269. }
  270. // See https://github.com/pion/webrtc/issues/1516
  271. // nolint:gochecknoglobals
  272. var rlBufPool = sync.Pool{New: func() interface{} {
  273. return make([]byte, dataChannelBufferSize)
  274. }}
  275. func (d *DataChannel) readLoop() {
  276. for {
  277. buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
  278. n, isString, err := d.dataChannel.ReadDataChannel(buffer)
  279. if err != nil {
  280. rlBufPool.Put(buffer) // nolint:staticcheck
  281. d.setReadyState(DataChannelStateClosed)
  282. if !errors.Is(err, io.EOF) {
  283. d.onError(err)
  284. }
  285. d.onClose()
  286. return
  287. }
  288. m := DataChannelMessage{Data: make([]byte, n), IsString: isString}
  289. copy(m.Data, buffer[:n])
  290. // The 'staticcheck' pragma is a false positive on the part of the CI linter.
  291. rlBufPool.Put(buffer) // nolint:staticcheck
  292. // NB: Why was DataChannelMessage not passed as a pointer value?
  293. d.onMessage(m) // nolint:staticcheck
  294. }
  295. }
  296. // Send sends the binary message to the DataChannel peer
  297. func (d *DataChannel) Send(data []byte) error {
  298. err := d.ensureOpen()
  299. if err != nil {
  300. return err
  301. }
  302. _, err = d.dataChannel.WriteDataChannel(data, false)
  303. return err
  304. }
  305. // SendText sends the text message to the DataChannel peer
  306. func (d *DataChannel) SendText(s string) error {
  307. err := d.ensureOpen()
  308. if err != nil {
  309. return err
  310. }
  311. _, err = d.dataChannel.WriteDataChannel([]byte(s), true)
  312. return err
  313. }
  314. func (d *DataChannel) ensureOpen() error {
  315. d.mu.RLock()
  316. defer d.mu.RUnlock()
  317. if d.ReadyState() != DataChannelStateOpen {
  318. return io.ErrClosedPipe
  319. }
  320. return nil
  321. }
  322. // Detach allows you to detach the underlying datachannel. This provides
  323. // an idiomatic API to work with, however it disables the OnMessage callback.
  324. // Before calling Detach you have to enable this behavior by calling
  325. // webrtc.DetachDataChannels(). Combining detached and normal data channels
  326. // is not supported.
  327. // Please refer to the data-channels-detach example and the
  328. // pion/datachannel documentation for the correct way to handle the
  329. // resulting DataChannel object.
  330. func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
  331. d.mu.Lock()
  332. defer d.mu.Unlock()
  333. if !d.api.settingEngine.detach.DataChannels {
  334. return nil, errDetachNotEnabled
  335. }
  336. if d.dataChannel == nil {
  337. return nil, errDetachBeforeOpened
  338. }
  339. d.detachCalled = true
  340. return d.dataChannel, nil
  341. }
  342. // Close Closes the DataChannel. It may be called regardless of whether
  343. // the DataChannel object was created by this peer or the remote peer.
  344. func (d *DataChannel) Close() error {
  345. d.mu.Lock()
  346. haveSctpTransport := d.dataChannel != nil
  347. d.mu.Unlock()
  348. if d.ReadyState() == DataChannelStateClosed {
  349. return nil
  350. }
  351. d.setReadyState(DataChannelStateClosing)
  352. if !haveSctpTransport {
  353. return nil
  354. }
  355. return d.dataChannel.Close()
  356. }
  357. // Label represents a label that can be used to distinguish this
  358. // DataChannel object from other DataChannel objects. Scripts are
  359. // allowed to create multiple DataChannel objects with the same label.
  360. func (d *DataChannel) Label() string {
  361. d.mu.RLock()
  362. defer d.mu.RUnlock()
  363. return d.label
  364. }
  365. // Ordered returns true if the DataChannel is ordered, and false if
  366. // out-of-order delivery is allowed.
  367. func (d *DataChannel) Ordered() bool {
  368. d.mu.RLock()
  369. defer d.mu.RUnlock()
  370. return d.ordered
  371. }
  372. // MaxPacketLifeTime represents the length of the time window (msec) during
  373. // which transmissions and retransmissions may occur in unreliable mode.
  374. func (d *DataChannel) MaxPacketLifeTime() *uint16 {
  375. d.mu.RLock()
  376. defer d.mu.RUnlock()
  377. return d.maxPacketLifeTime
  378. }
  379. // MaxRetransmits represents the maximum number of retransmissions that are
  380. // attempted in unreliable mode.
  381. func (d *DataChannel) MaxRetransmits() *uint16 {
  382. d.mu.RLock()
  383. defer d.mu.RUnlock()
  384. return d.maxRetransmits
  385. }
  386. // Protocol represents the name of the sub-protocol used with this
  387. // DataChannel.
  388. func (d *DataChannel) Protocol() string {
  389. d.mu.RLock()
  390. defer d.mu.RUnlock()
  391. return d.protocol
  392. }
  393. // Negotiated represents whether this DataChannel was negotiated by the
  394. // application (true), or not (false).
  395. func (d *DataChannel) Negotiated() bool {
  396. d.mu.RLock()
  397. defer d.mu.RUnlock()
  398. return d.negotiated
  399. }
  400. // ID represents the ID for this DataChannel. The value is initially
  401. // null, which is what will be returned if the ID was not provided at
  402. // channel creation time, and the DTLS role of the SCTP transport has not
  403. // yet been negotiated. Otherwise, it will return the ID that was either
  404. // selected by the script or generated. After the ID is set to a non-null
  405. // value, it will not change.
  406. func (d *DataChannel) ID() *uint16 {
  407. d.mu.RLock()
  408. defer d.mu.RUnlock()
  409. return d.id
  410. }
  411. // ReadyState represents the state of the DataChannel object.
  412. func (d *DataChannel) ReadyState() DataChannelState {
  413. if v, ok := d.readyState.Load().(DataChannelState); ok {
  414. return v
  415. }
  416. return DataChannelState(0)
  417. }
  418. // BufferedAmount represents the number of bytes of application data
  419. // (UTF-8 text and binary data) that have been queued using send(). Even
  420. // though the data transmission can occur in parallel, the returned value
  421. // MUST NOT be decreased before the current task yielded back to the event
  422. // loop to prevent race conditions. The value does not include framing
  423. // overhead incurred by the protocol, or buffering done by the operating
  424. // system or network hardware. The value of BufferedAmount slot will only
  425. // increase with each call to the send() method as long as the ReadyState is
  426. // open; however, BufferedAmount does not reset to zero once the channel
  427. // closes.
  428. func (d *DataChannel) BufferedAmount() uint64 {
  429. d.mu.RLock()
  430. defer d.mu.RUnlock()
  431. if d.dataChannel == nil {
  432. return 0
  433. }
  434. return d.dataChannel.BufferedAmount()
  435. }
  436. // BufferedAmountLowThreshold represents the threshold at which the
  437. // bufferedAmount is considered to be low. When the bufferedAmount decreases
  438. // from above this threshold to equal or below it, the bufferedamountlow
  439. // event fires. BufferedAmountLowThreshold is initially zero on each new
  440. // DataChannel, but the application may change its value at any time.
  441. // The threshold is set to 0 by default.
  442. func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
  443. d.mu.RLock()
  444. defer d.mu.RUnlock()
  445. if d.dataChannel == nil {
  446. return d.bufferedAmountLowThreshold
  447. }
  448. return d.dataChannel.BufferedAmountLowThreshold()
  449. }
  450. // SetBufferedAmountLowThreshold is used to update the threshold.
  451. // See BufferedAmountLowThreshold().
  452. func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
  453. d.mu.Lock()
  454. defer d.mu.Unlock()
  455. d.bufferedAmountLowThreshold = th
  456. if d.dataChannel != nil {
  457. d.dataChannel.SetBufferedAmountLowThreshold(th)
  458. }
  459. }
  460. // OnBufferedAmountLow sets an event handler which is invoked when
  461. // the number of bytes of outgoing data becomes lower than the
  462. // BufferedAmountLowThreshold.
  463. func (d *DataChannel) OnBufferedAmountLow(f func()) {
  464. d.mu.Lock()
  465. defer d.mu.Unlock()
  466. d.onBufferedAmountLow = f
  467. if d.dataChannel != nil {
  468. d.dataChannel.OnBufferedAmountLow(f)
  469. }
  470. }
  471. func (d *DataChannel) getStatsID() string {
  472. d.mu.Lock()
  473. defer d.mu.Unlock()
  474. return d.statsID
  475. }
  476. func (d *DataChannel) collectStats(collector *statsReportCollector) {
  477. collector.Collecting()
  478. d.mu.Lock()
  479. defer d.mu.Unlock()
  480. stats := DataChannelStats{
  481. Timestamp: statsTimestampNow(),
  482. Type: StatsTypeDataChannel,
  483. ID: d.statsID,
  484. Label: d.label,
  485. Protocol: d.protocol,
  486. // TransportID string `json:"transportId"`
  487. State: d.ReadyState(),
  488. }
  489. if d.id != nil {
  490. stats.DataChannelIdentifier = int32(*d.id)
  491. }
  492. if d.dataChannel != nil {
  493. stats.MessagesSent = d.dataChannel.MessagesSent()
  494. stats.BytesSent = d.dataChannel.BytesSent()
  495. stats.MessagesReceived = d.dataChannel.MessagesReceived()
  496. stats.BytesReceived = d.dataChannel.BytesReceived()
  497. }
  498. collector.Collect(stats.ID, stats)
  499. }
  500. func (d *DataChannel) setReadyState(r DataChannelState) {
  501. d.readyState.Store(r)
  502. }