icetransport.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. //go:build !js
  2. // +build !js
  3. package webrtc
  4. import (
  5. "context"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/pion/ice/v2"
  11. "github.com/pion/logging"
  12. "github.com/pion/webrtc/v3/internal/mux"
  13. )
  14. // ICETransport allows an application access to information about the ICE
  15. // transport over which packets are sent and received.
  16. type ICETransport struct {
  17. lock sync.RWMutex
  18. role ICERole
  19. onConnectionStateChangeHandler atomic.Value // func(ICETransportState)
  20. internalOnConnectionStateChangeHandler atomic.Value // func(ICETransportState)
  21. onSelectedCandidatePairChangeHandler atomic.Value // func(*ICECandidatePair)
  22. state atomic.Value // ICETransportState
  23. gatherer *ICEGatherer
  24. conn *ice.Conn
  25. mux *mux.Mux
  26. ctx context.Context
  27. ctxCancel func()
  28. loggerFactory logging.LoggerFactory
  29. log logging.LeveledLogger
  30. }
  31. // GetSelectedCandidatePair returns the selected candidate pair on which packets are sent
  32. // if there is no selected pair nil is returned
  33. func (t *ICETransport) GetSelectedCandidatePair() (*ICECandidatePair, error) {
  34. agent := t.gatherer.getAgent()
  35. if agent == nil {
  36. return nil, nil //nolint:nilnil
  37. }
  38. icePair, err := agent.GetSelectedCandidatePair()
  39. if icePair == nil || err != nil {
  40. return nil, err
  41. }
  42. local, err := newICECandidateFromICE(icePair.Local)
  43. if err != nil {
  44. return nil, err
  45. }
  46. remote, err := newICECandidateFromICE(icePair.Remote)
  47. if err != nil {
  48. return nil, err
  49. }
  50. return &ICECandidatePair{Local: &local, Remote: &remote}, nil
  51. }
  52. // NewICETransport creates a new NewICETransport.
  53. func NewICETransport(gatherer *ICEGatherer, loggerFactory logging.LoggerFactory) *ICETransport {
  54. iceTransport := &ICETransport{
  55. gatherer: gatherer,
  56. loggerFactory: loggerFactory,
  57. log: loggerFactory.NewLogger("ortc"),
  58. }
  59. iceTransport.setState(ICETransportStateNew)
  60. return iceTransport
  61. }
  62. // Start incoming connectivity checks based on its configured role.
  63. func (t *ICETransport) Start(gatherer *ICEGatherer, params ICEParameters, role *ICERole) error {
  64. t.lock.Lock()
  65. defer t.lock.Unlock()
  66. if t.State() != ICETransportStateNew {
  67. return errICETransportNotInNew
  68. }
  69. if gatherer != nil {
  70. t.gatherer = gatherer
  71. }
  72. if err := t.ensureGatherer(); err != nil {
  73. return err
  74. }
  75. agent := t.gatherer.getAgent()
  76. if agent == nil {
  77. return fmt.Errorf("%w: unable to start ICETransport", errICEAgentNotExist)
  78. }
  79. if err := agent.OnConnectionStateChange(func(iceState ice.ConnectionState) {
  80. state := newICETransportStateFromICE(iceState)
  81. t.setState(state)
  82. t.onConnectionStateChange(state)
  83. }); err != nil {
  84. return err
  85. }
  86. if err := agent.OnSelectedCandidatePairChange(func(local, remote ice.Candidate) {
  87. candidates, err := newICECandidatesFromICE([]ice.Candidate{local, remote})
  88. if err != nil {
  89. t.log.Warnf("%w: %s", errICECandiatesCoversionFailed, err)
  90. return
  91. }
  92. t.onSelectedCandidatePairChange(NewICECandidatePair(&candidates[0], &candidates[1]))
  93. }); err != nil {
  94. return err
  95. }
  96. if role == nil {
  97. controlled := ICERoleControlled
  98. role = &controlled
  99. }
  100. t.role = *role
  101. t.ctx, t.ctxCancel = context.WithCancel(context.Background())
  102. // Drop the lock here to allow ICE candidates to be
  103. // added so that the agent can complete a connection
  104. t.lock.Unlock()
  105. var iceConn *ice.Conn
  106. var err error
  107. switch *role {
  108. case ICERoleControlling:
  109. iceConn, err = agent.Dial(t.ctx,
  110. params.UsernameFragment,
  111. params.Password)
  112. case ICERoleControlled:
  113. iceConn, err = agent.Accept(t.ctx,
  114. params.UsernameFragment,
  115. params.Password)
  116. default:
  117. err = errICERoleUnknown
  118. }
  119. // Reacquire the lock to set the connection/mux
  120. t.lock.Lock()
  121. if err != nil {
  122. return err
  123. }
  124. t.conn = iceConn
  125. config := mux.Config{
  126. Conn: t.conn,
  127. BufferSize: int(t.gatherer.api.settingEngine.getReceiveMTU()),
  128. LoggerFactory: t.loggerFactory,
  129. }
  130. t.mux = mux.NewMux(config)
  131. return nil
  132. }
  133. // restart is not exposed currently because ORTC has users create a whole new ICETransport
  134. // so for now lets keep it private so we don't cause ORTC users to depend on non-standard APIs
  135. func (t *ICETransport) restart() error {
  136. t.lock.Lock()
  137. defer t.lock.Unlock()
  138. agent := t.gatherer.getAgent()
  139. if agent == nil {
  140. return fmt.Errorf("%w: unable to restart ICETransport", errICEAgentNotExist)
  141. }
  142. if err := agent.Restart(t.gatherer.api.settingEngine.candidates.UsernameFragment, t.gatherer.api.settingEngine.candidates.Password); err != nil {
  143. return err
  144. }
  145. return t.gatherer.Gather()
  146. }
  147. // Stop irreversibly stops the ICETransport.
  148. func (t *ICETransport) Stop() error {
  149. t.lock.Lock()
  150. defer t.lock.Unlock()
  151. t.setState(ICETransportStateClosed)
  152. if t.ctxCancel != nil {
  153. t.ctxCancel()
  154. }
  155. if t.mux != nil {
  156. return t.mux.Close()
  157. } else if t.gatherer != nil {
  158. return t.gatherer.Close()
  159. }
  160. return nil
  161. }
  162. // OnSelectedCandidatePairChange sets a handler that is invoked when a new
  163. // ICE candidate pair is selected
  164. func (t *ICETransport) OnSelectedCandidatePairChange(f func(*ICECandidatePair)) {
  165. t.onSelectedCandidatePairChangeHandler.Store(f)
  166. }
  167. func (t *ICETransport) onSelectedCandidatePairChange(pair *ICECandidatePair) {
  168. if handler, ok := t.onSelectedCandidatePairChangeHandler.Load().(func(*ICECandidatePair)); ok {
  169. handler(pair)
  170. }
  171. }
  172. // OnConnectionStateChange sets a handler that is fired when the ICE
  173. // connection state changes.
  174. func (t *ICETransport) OnConnectionStateChange(f func(ICETransportState)) {
  175. t.onConnectionStateChangeHandler.Store(f)
  176. }
  177. func (t *ICETransport) onConnectionStateChange(state ICETransportState) {
  178. if handler, ok := t.onConnectionStateChangeHandler.Load().(func(ICETransportState)); ok {
  179. handler(state)
  180. }
  181. if handler, ok := t.internalOnConnectionStateChangeHandler.Load().(func(ICETransportState)); ok {
  182. handler(state)
  183. }
  184. }
  185. // Role indicates the current role of the ICE transport.
  186. func (t *ICETransport) Role() ICERole {
  187. t.lock.RLock()
  188. defer t.lock.RUnlock()
  189. return t.role
  190. }
  191. // SetRemoteCandidates sets the sequence of candidates associated with the remote ICETransport.
  192. func (t *ICETransport) SetRemoteCandidates(remoteCandidates []ICECandidate) error {
  193. t.lock.RLock()
  194. defer t.lock.RUnlock()
  195. if err := t.ensureGatherer(); err != nil {
  196. return err
  197. }
  198. agent := t.gatherer.getAgent()
  199. if agent == nil {
  200. return fmt.Errorf("%w: unable to set remote candidates", errICEAgentNotExist)
  201. }
  202. for _, c := range remoteCandidates {
  203. i, err := c.toICE()
  204. if err != nil {
  205. return err
  206. }
  207. if err = agent.AddRemoteCandidate(i); err != nil {
  208. return err
  209. }
  210. }
  211. return nil
  212. }
  213. // AddRemoteCandidate adds a candidate associated with the remote ICETransport.
  214. func (t *ICETransport) AddRemoteCandidate(remoteCandidate *ICECandidate) error {
  215. t.lock.RLock()
  216. defer t.lock.RUnlock()
  217. var (
  218. c ice.Candidate
  219. err error
  220. )
  221. if err = t.ensureGatherer(); err != nil {
  222. return err
  223. }
  224. if remoteCandidate != nil {
  225. if c, err = remoteCandidate.toICE(); err != nil {
  226. return err
  227. }
  228. }
  229. agent := t.gatherer.getAgent()
  230. if agent == nil {
  231. return fmt.Errorf("%w: unable to add remote candidates", errICEAgentNotExist)
  232. }
  233. return agent.AddRemoteCandidate(c)
  234. }
  235. // State returns the current ice transport state.
  236. func (t *ICETransport) State() ICETransportState {
  237. if v, ok := t.state.Load().(ICETransportState); ok {
  238. return v
  239. }
  240. return ICETransportState(0)
  241. }
  242. func (t *ICETransport) setState(i ICETransportState) {
  243. t.state.Store(i)
  244. }
  245. func (t *ICETransport) newEndpoint(f mux.MatchFunc) *mux.Endpoint {
  246. t.lock.Lock()
  247. defer t.lock.Unlock()
  248. return t.mux.NewEndpoint(f)
  249. }
  250. func (t *ICETransport) ensureGatherer() error {
  251. if t.gatherer == nil {
  252. return errICEGathererNotStarted
  253. } else if t.gatherer.getAgent() == nil {
  254. if err := t.gatherer.createAgent(); err != nil {
  255. return err
  256. }
  257. }
  258. return nil
  259. }
  260. func (t *ICETransport) collectStats(collector *statsReportCollector) {
  261. t.lock.Lock()
  262. conn := t.conn
  263. t.lock.Unlock()
  264. collector.Collecting()
  265. stats := TransportStats{
  266. Timestamp: statsTimestampFrom(time.Now()),
  267. Type: StatsTypeTransport,
  268. ID: "iceTransport",
  269. }
  270. if conn != nil {
  271. stats.BytesSent = conn.BytesSent()
  272. stats.BytesReceived = conn.BytesReceived()
  273. }
  274. collector.Collect(stats.ID, stats)
  275. }
  276. func (t *ICETransport) haveRemoteCredentialsChange(newUfrag, newPwd string) bool {
  277. t.lock.Lock()
  278. defer t.lock.Unlock()
  279. agent := t.gatherer.getAgent()
  280. if agent == nil {
  281. return false
  282. }
  283. uFrag, uPwd, err := agent.GetRemoteUserCredentials()
  284. if err != nil {
  285. return false
  286. }
  287. return uFrag != newUfrag || uPwd != newPwd
  288. }
  289. func (t *ICETransport) setRemoteCredentials(newUfrag, newPwd string) error {
  290. t.lock.Lock()
  291. defer t.lock.Unlock()
  292. agent := t.gatherer.getAgent()
  293. if agent == nil {
  294. return fmt.Errorf("%w: unable to SetRemoteCredentials", errICEAgentNotExist)
  295. }
  296. return agent.SetRemoteCredentials(newUfrag, newPwd)
  297. }