icegatherer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. //go:build !js
  2. // +build !js
  3. package webrtc
  4. import (
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/pion/ice/v2"
  9. "github.com/pion/logging"
  10. )
  11. // ICEGatherer gathers local host, server reflexive and relay
  12. // candidates, as well as enabling the retrieval of local Interactive
  13. // Connectivity Establishment (ICE) parameters which can be
  14. // exchanged in signaling.
  15. type ICEGatherer struct {
  16. lock sync.RWMutex
  17. log logging.LeveledLogger
  18. state ICEGathererState
  19. validatedServers []*ice.URL
  20. gatherPolicy ICETransportPolicy
  21. agent *ice.Agent
  22. onLocalCandidateHandler atomic.Value // func(candidate *ICECandidate)
  23. onStateChangeHandler atomic.Value // func(state ICEGathererState)
  24. // Used for GatheringCompletePromise
  25. onGatheringCompleteHandler atomic.Value // func()
  26. api *API
  27. }
  28. // NewICEGatherer creates a new NewICEGatherer.
  29. // This constructor is part of the ORTC API. It is not
  30. // meant to be used together with the basic WebRTC API.
  31. func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) {
  32. var validatedServers []*ice.URL
  33. if len(opts.ICEServers) > 0 {
  34. for _, server := range opts.ICEServers {
  35. url, err := server.urls()
  36. if err != nil {
  37. return nil, err
  38. }
  39. validatedServers = append(validatedServers, url...)
  40. }
  41. }
  42. return &ICEGatherer{
  43. state: ICEGathererStateNew,
  44. gatherPolicy: opts.ICEGatherPolicy,
  45. validatedServers: validatedServers,
  46. api: api,
  47. log: api.settingEngine.LoggerFactory.NewLogger("ice"),
  48. }, nil
  49. }
  50. func (g *ICEGatherer) createAgent() error {
  51. g.lock.Lock()
  52. defer g.lock.Unlock()
  53. if g.agent != nil || g.State() != ICEGathererStateNew {
  54. return nil
  55. }
  56. candidateTypes := []ice.CandidateType{}
  57. if g.api.settingEngine.candidates.ICELite {
  58. candidateTypes = append(candidateTypes, ice.CandidateTypeHost)
  59. } else if g.gatherPolicy == ICETransportPolicyRelay {
  60. candidateTypes = append(candidateTypes, ice.CandidateTypeRelay)
  61. }
  62. var nat1To1CandiTyp ice.CandidateType
  63. switch g.api.settingEngine.candidates.NAT1To1IPCandidateType {
  64. case ICECandidateTypeHost:
  65. nat1To1CandiTyp = ice.CandidateTypeHost
  66. case ICECandidateTypeSrflx:
  67. nat1To1CandiTyp = ice.CandidateTypeServerReflexive
  68. default:
  69. nat1To1CandiTyp = ice.CandidateTypeUnspecified
  70. }
  71. mDNSMode := g.api.settingEngine.candidates.MulticastDNSMode
  72. if mDNSMode != ice.MulticastDNSModeDisabled && mDNSMode != ice.MulticastDNSModeQueryAndGather {
  73. // If enum is in state we don't recognized default to MulticastDNSModeQueryOnly
  74. mDNSMode = ice.MulticastDNSModeQueryOnly
  75. }
  76. config := &ice.AgentConfig{
  77. Lite: g.api.settingEngine.candidates.ICELite,
  78. Urls: g.validatedServers,
  79. PortMin: g.api.settingEngine.ephemeralUDP.PortMin,
  80. PortMax: g.api.settingEngine.ephemeralUDP.PortMax,
  81. DisconnectedTimeout: g.api.settingEngine.timeout.ICEDisconnectedTimeout,
  82. FailedTimeout: g.api.settingEngine.timeout.ICEFailedTimeout,
  83. KeepaliveInterval: g.api.settingEngine.timeout.ICEKeepaliveInterval,
  84. LoggerFactory: g.api.settingEngine.LoggerFactory,
  85. CandidateTypes: candidateTypes,
  86. HostAcceptanceMinWait: g.api.settingEngine.timeout.ICEHostAcceptanceMinWait,
  87. SrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICESrflxAcceptanceMinWait,
  88. PrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICEPrflxAcceptanceMinWait,
  89. RelayAcceptanceMinWait: g.api.settingEngine.timeout.ICERelayAcceptanceMinWait,
  90. InterfaceFilter: g.api.settingEngine.candidates.InterfaceFilter,
  91. NAT1To1IPs: g.api.settingEngine.candidates.NAT1To1IPs,
  92. NAT1To1IPCandidateType: nat1To1CandiTyp,
  93. Net: g.api.settingEngine.vnet,
  94. MulticastDNSMode: mDNSMode,
  95. MulticastDNSHostName: g.api.settingEngine.candidates.MulticastDNSHostName,
  96. LocalUfrag: g.api.settingEngine.candidates.UsernameFragment,
  97. LocalPwd: g.api.settingEngine.candidates.Password,
  98. TCPMux: g.api.settingEngine.iceTCPMux,
  99. UDPMux: g.api.settingEngine.iceUDPMux,
  100. ProxyDialer: g.api.settingEngine.iceProxyDialer,
  101. }
  102. requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes
  103. if len(requestedNetworkTypes) == 0 {
  104. requestedNetworkTypes = supportedNetworkTypes()
  105. }
  106. for _, typ := range requestedNetworkTypes {
  107. config.NetworkTypes = append(config.NetworkTypes, ice.NetworkType(typ))
  108. }
  109. agent, err := ice.NewAgent(config)
  110. if err != nil {
  111. return err
  112. }
  113. g.agent = agent
  114. return nil
  115. }
  116. // Gather ICE candidates.
  117. func (g *ICEGatherer) Gather() error {
  118. if err := g.createAgent(); err != nil {
  119. return err
  120. }
  121. g.lock.Lock()
  122. agent := g.agent
  123. g.lock.Unlock()
  124. // it is possible agent had just been closed
  125. if agent == nil {
  126. return fmt.Errorf("%w: unable to gather", errICEAgentNotExist)
  127. }
  128. g.setState(ICEGathererStateGathering)
  129. if err := agent.OnCandidate(func(candidate ice.Candidate) {
  130. onLocalCandidateHandler := func(*ICECandidate) {}
  131. if handler, ok := g.onLocalCandidateHandler.Load().(func(candidate *ICECandidate)); ok && handler != nil {
  132. onLocalCandidateHandler = handler
  133. }
  134. onGatheringCompleteHandler := func() {}
  135. if handler, ok := g.onGatheringCompleteHandler.Load().(func()); ok && handler != nil {
  136. onGatheringCompleteHandler = handler
  137. }
  138. if candidate != nil {
  139. c, err := newICECandidateFromICE(candidate)
  140. if err != nil {
  141. g.log.Warnf("Failed to convert ice.Candidate: %s", err)
  142. return
  143. }
  144. onLocalCandidateHandler(&c)
  145. } else {
  146. g.setState(ICEGathererStateComplete)
  147. onGatheringCompleteHandler()
  148. onLocalCandidateHandler(nil)
  149. }
  150. }); err != nil {
  151. return err
  152. }
  153. return agent.GatherCandidates()
  154. }
  155. // Close prunes all local candidates, and closes the ports.
  156. func (g *ICEGatherer) Close() error {
  157. g.lock.Lock()
  158. defer g.lock.Unlock()
  159. if g.agent == nil {
  160. return nil
  161. } else if err := g.agent.Close(); err != nil {
  162. return err
  163. }
  164. g.agent = nil
  165. g.setState(ICEGathererStateClosed)
  166. return nil
  167. }
  168. // GetLocalParameters returns the ICE parameters of the ICEGatherer.
  169. func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) {
  170. if err := g.createAgent(); err != nil {
  171. return ICEParameters{}, err
  172. }
  173. frag, pwd, err := g.agent.GetLocalUserCredentials()
  174. if err != nil {
  175. return ICEParameters{}, err
  176. }
  177. return ICEParameters{
  178. UsernameFragment: frag,
  179. Password: pwd,
  180. ICELite: false,
  181. }, nil
  182. }
  183. // GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer.
  184. func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) {
  185. if err := g.createAgent(); err != nil {
  186. return nil, err
  187. }
  188. iceCandidates, err := g.agent.GetLocalCandidates()
  189. if err != nil {
  190. return nil, err
  191. }
  192. return newICECandidatesFromICE(iceCandidates)
  193. }
  194. // OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available
  195. // Take note that the handler will be called with a nil pointer when gathering is finished.
  196. func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) {
  197. g.onLocalCandidateHandler.Store(f)
  198. }
  199. // OnStateChange fires any time the ICEGatherer changes
  200. func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) {
  201. g.onStateChangeHandler.Store(f)
  202. }
  203. // State indicates the current state of the ICE gatherer.
  204. func (g *ICEGatherer) State() ICEGathererState {
  205. return atomicLoadICEGathererState(&g.state)
  206. }
  207. func (g *ICEGatherer) setState(s ICEGathererState) {
  208. atomicStoreICEGathererState(&g.state, s)
  209. if handler, ok := g.onStateChangeHandler.Load().(func(state ICEGathererState)); ok && handler != nil {
  210. handler(s)
  211. }
  212. }
  213. func (g *ICEGatherer) getAgent() *ice.Agent {
  214. g.lock.RLock()
  215. defer g.lock.RUnlock()
  216. return g.agent
  217. }
  218. func (g *ICEGatherer) collectStats(collector *statsReportCollector) {
  219. agent := g.getAgent()
  220. if agent == nil {
  221. return
  222. }
  223. collector.Collecting()
  224. go func(collector *statsReportCollector, agent *ice.Agent) {
  225. for _, candidatePairStats := range agent.GetCandidatePairsStats() {
  226. collector.Collecting()
  227. state, err := toStatsICECandidatePairState(candidatePairStats.State)
  228. if err != nil {
  229. g.log.Error(err.Error())
  230. }
  231. pairID := newICECandidatePairStatsID(candidatePairStats.LocalCandidateID,
  232. candidatePairStats.RemoteCandidateID)
  233. stats := ICECandidatePairStats{
  234. Timestamp: statsTimestampFrom(candidatePairStats.Timestamp),
  235. Type: StatsTypeCandidatePair,
  236. ID: pairID,
  237. // TransportID:
  238. LocalCandidateID: candidatePairStats.LocalCandidateID,
  239. RemoteCandidateID: candidatePairStats.RemoteCandidateID,
  240. State: state,
  241. Nominated: candidatePairStats.Nominated,
  242. PacketsSent: candidatePairStats.PacketsSent,
  243. PacketsReceived: candidatePairStats.PacketsReceived,
  244. BytesSent: candidatePairStats.BytesSent,
  245. BytesReceived: candidatePairStats.BytesReceived,
  246. LastPacketSentTimestamp: statsTimestampFrom(candidatePairStats.LastPacketSentTimestamp),
  247. LastPacketReceivedTimestamp: statsTimestampFrom(candidatePairStats.LastPacketReceivedTimestamp),
  248. FirstRequestTimestamp: statsTimestampFrom(candidatePairStats.FirstRequestTimestamp),
  249. LastRequestTimestamp: statsTimestampFrom(candidatePairStats.LastRequestTimestamp),
  250. LastResponseTimestamp: statsTimestampFrom(candidatePairStats.LastResponseTimestamp),
  251. TotalRoundTripTime: candidatePairStats.TotalRoundTripTime,
  252. CurrentRoundTripTime: candidatePairStats.CurrentRoundTripTime,
  253. AvailableOutgoingBitrate: candidatePairStats.AvailableOutgoingBitrate,
  254. AvailableIncomingBitrate: candidatePairStats.AvailableIncomingBitrate,
  255. CircuitBreakerTriggerCount: candidatePairStats.CircuitBreakerTriggerCount,
  256. RequestsReceived: candidatePairStats.RequestsReceived,
  257. RequestsSent: candidatePairStats.RequestsSent,
  258. ResponsesReceived: candidatePairStats.ResponsesReceived,
  259. ResponsesSent: candidatePairStats.ResponsesSent,
  260. RetransmissionsReceived: candidatePairStats.RetransmissionsReceived,
  261. RetransmissionsSent: candidatePairStats.RetransmissionsSent,
  262. ConsentRequestsSent: candidatePairStats.ConsentRequestsSent,
  263. ConsentExpiredTimestamp: statsTimestampFrom(candidatePairStats.ConsentExpiredTimestamp),
  264. }
  265. collector.Collect(stats.ID, stats)
  266. }
  267. for _, candidateStats := range agent.GetLocalCandidatesStats() {
  268. collector.Collecting()
  269. networkType, err := getNetworkType(candidateStats.NetworkType)
  270. if err != nil {
  271. g.log.Error(err.Error())
  272. }
  273. candidateType, err := getCandidateType(candidateStats.CandidateType)
  274. if err != nil {
  275. g.log.Error(err.Error())
  276. }
  277. stats := ICECandidateStats{
  278. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  279. ID: candidateStats.ID,
  280. Type: StatsTypeLocalCandidate,
  281. NetworkType: networkType,
  282. IP: candidateStats.IP,
  283. Port: int32(candidateStats.Port),
  284. Protocol: networkType.Protocol(),
  285. CandidateType: candidateType,
  286. Priority: int32(candidateStats.Priority),
  287. URL: candidateStats.URL,
  288. RelayProtocol: candidateStats.RelayProtocol,
  289. Deleted: candidateStats.Deleted,
  290. }
  291. collector.Collect(stats.ID, stats)
  292. }
  293. for _, candidateStats := range agent.GetRemoteCandidatesStats() {
  294. collector.Collecting()
  295. networkType, err := getNetworkType(candidateStats.NetworkType)
  296. if err != nil {
  297. g.log.Error(err.Error())
  298. }
  299. candidateType, err := getCandidateType(candidateStats.CandidateType)
  300. if err != nil {
  301. g.log.Error(err.Error())
  302. }
  303. stats := ICECandidateStats{
  304. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  305. ID: candidateStats.ID,
  306. Type: StatsTypeRemoteCandidate,
  307. NetworkType: networkType,
  308. IP: candidateStats.IP,
  309. Port: int32(candidateStats.Port),
  310. Protocol: networkType.Protocol(),
  311. CandidateType: candidateType,
  312. Priority: int32(candidateStats.Priority),
  313. URL: candidateStats.URL,
  314. RelayProtocol: candidateStats.RelayProtocol,
  315. }
  316. collector.Collect(stats.ID, stats)
  317. }
  318. collector.Done()
  319. }(collector, agent)
  320. }