agent.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270
  1. // Package ice implements the Interactive Connectivity Establishment (ICE)
  2. // protocol defined in rfc5245.
  3. package ice
  4. import (
  5. "context"
  6. "net"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/pion/logging"
  12. "github.com/pion/mdns"
  13. "github.com/pion/stun"
  14. "github.com/pion/transport/packetio"
  15. "github.com/pion/transport/vnet"
  16. "golang.org/x/net/proxy"
  17. )
  18. type bindingRequest struct {
  19. timestamp time.Time
  20. transactionID [stun.TransactionIDSize]byte
  21. destination net.Addr
  22. isUseCandidate bool
  23. }
  24. // Agent represents the ICE agent
  25. type Agent struct {
  26. chanTask chan task
  27. afterRunFn []func(ctx context.Context)
  28. muAfterRun sync.Mutex
  29. onConnectionStateChangeHdlr atomic.Value // func(ConnectionState)
  30. onSelectedCandidatePairChangeHdlr atomic.Value // func(Candidate, Candidate)
  31. onCandidateHdlr atomic.Value // func(Candidate)
  32. // State owned by the taskLoop
  33. onConnected chan struct{}
  34. onConnectedOnce sync.Once
  35. // force candidate to be contacted immediately (instead of waiting for task ticker)
  36. forceCandidateContact chan bool
  37. tieBreaker uint64
  38. lite bool
  39. connectionState ConnectionState
  40. gatheringState GatheringState
  41. mDNSMode MulticastDNSMode
  42. mDNSName string
  43. mDNSConn *mdns.Conn
  44. muHaveStarted sync.Mutex
  45. startedCh <-chan struct{}
  46. startedFn func()
  47. isControlling bool
  48. maxBindingRequests uint16
  49. hostAcceptanceMinWait time.Duration
  50. srflxAcceptanceMinWait time.Duration
  51. prflxAcceptanceMinWait time.Duration
  52. relayAcceptanceMinWait time.Duration
  53. portmin uint16
  54. portmax uint16
  55. candidateTypes []CandidateType
  56. // How long connectivity checks can fail before the ICE Agent
  57. // goes to disconnected
  58. disconnectedTimeout time.Duration
  59. // How long connectivity checks can fail before the ICE Agent
  60. // goes to failed
  61. failedTimeout time.Duration
  62. // How often should we send keepalive packets?
  63. // 0 means never
  64. keepaliveInterval time.Duration
  65. // How often should we run our internal taskLoop to check for state changes when connecting
  66. checkInterval time.Duration
  67. localUfrag string
  68. localPwd string
  69. localCandidates map[NetworkType][]Candidate
  70. remoteUfrag string
  71. remotePwd string
  72. remoteCandidates map[NetworkType][]Candidate
  73. checklist []*CandidatePair
  74. selector pairCandidateSelector
  75. selectedPair atomic.Value // *CandidatePair
  76. urls []*URL
  77. networkTypes []NetworkType
  78. buffer *packetio.Buffer
  79. // LRU of outbound Binding request Transaction IDs
  80. pendingBindingRequests []bindingRequest
  81. // 1:1 D-NAT IP address mapping
  82. extIPMapper *externalIPMapper
  83. // State for closing
  84. done chan struct{}
  85. taskLoopDone chan struct{}
  86. err atomicError
  87. gatherCandidateCancel func()
  88. gatherCandidateDone chan struct{}
  89. chanCandidate chan Candidate
  90. chanCandidatePair chan *CandidatePair
  91. chanState chan ConnectionState
  92. loggerFactory logging.LoggerFactory
  93. log logging.LeveledLogger
  94. net *vnet.Net
  95. tcpMux TCPMux
  96. udpMux UDPMux
  97. udpMuxSrflx UniversalUDPMux
  98. interfaceFilter func(string) bool
  99. insecureSkipVerify bool
  100. proxyDialer proxy.Dialer
  101. }
  102. type task struct {
  103. fn func(context.Context, *Agent)
  104. done chan struct{}
  105. }
  106. // afterRun registers function to be run after the task.
  107. func (a *Agent) afterRun(f func(context.Context)) {
  108. a.muAfterRun.Lock()
  109. a.afterRunFn = append(a.afterRunFn, f)
  110. a.muAfterRun.Unlock()
  111. }
  112. func (a *Agent) getAfterRunFn() []func(context.Context) {
  113. a.muAfterRun.Lock()
  114. defer a.muAfterRun.Unlock()
  115. fns := a.afterRunFn
  116. a.afterRunFn = nil
  117. return fns
  118. }
  119. func (a *Agent) ok() error {
  120. select {
  121. case <-a.done:
  122. return a.getErr()
  123. default:
  124. }
  125. return nil
  126. }
  127. func (a *Agent) getErr() error {
  128. if err := a.err.Load(); err != nil {
  129. return err
  130. }
  131. return ErrClosed
  132. }
  133. // Run task in serial. Blocking tasks must be cancelable by context.
  134. func (a *Agent) run(ctx context.Context, t func(context.Context, *Agent)) error {
  135. if err := a.ok(); err != nil {
  136. return err
  137. }
  138. done := make(chan struct{})
  139. select {
  140. case <-ctx.Done():
  141. return ctx.Err()
  142. case a.chanTask <- task{t, done}:
  143. <-done
  144. return nil
  145. }
  146. }
  147. // taskLoop handles registered tasks and agent close.
  148. func (a *Agent) taskLoop() {
  149. after := func() {
  150. for {
  151. // Get and run func registered by afterRun().
  152. fns := a.getAfterRunFn()
  153. if len(fns) == 0 {
  154. break
  155. }
  156. for _, fn := range fns {
  157. fn(a.context())
  158. }
  159. }
  160. }
  161. defer func() {
  162. a.deleteAllCandidates()
  163. a.startedFn()
  164. if err := a.buffer.Close(); err != nil {
  165. a.log.Warnf("failed to close buffer: %v", err)
  166. }
  167. a.closeMulticastConn()
  168. a.updateConnectionState(ConnectionStateClosed)
  169. after()
  170. close(a.chanState)
  171. close(a.chanCandidate)
  172. close(a.chanCandidatePair)
  173. close(a.taskLoopDone)
  174. }()
  175. for {
  176. select {
  177. case <-a.done:
  178. return
  179. case t := <-a.chanTask:
  180. t.fn(a.context(), a)
  181. close(t.done)
  182. after()
  183. }
  184. }
  185. }
  186. // NewAgent creates a new Agent
  187. func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
  188. var err error
  189. if config.PortMax < config.PortMin {
  190. return nil, ErrPort
  191. }
  192. mDNSName := config.MulticastDNSHostName
  193. if mDNSName == "" {
  194. if mDNSName, err = generateMulticastDNSName(); err != nil {
  195. return nil, err
  196. }
  197. }
  198. if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
  199. return nil, ErrInvalidMulticastDNSHostName
  200. }
  201. mDNSMode := config.MulticastDNSMode
  202. if mDNSMode == 0 {
  203. mDNSMode = MulticastDNSModeQueryOnly
  204. }
  205. loggerFactory := config.LoggerFactory
  206. if loggerFactory == nil {
  207. loggerFactory = logging.NewDefaultLoggerFactory()
  208. }
  209. log := loggerFactory.NewLogger("ice")
  210. var mDNSConn *mdns.Conn
  211. mDNSConn, mDNSMode, err = createMulticastDNS(mDNSMode, mDNSName, log)
  212. // Opportunistic mDNS: If we can't open the connection, that's ok: we
  213. // can continue without it.
  214. if err != nil {
  215. log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
  216. }
  217. closeMDNSConn := func() {
  218. if mDNSConn != nil {
  219. if mdnsCloseErr := mDNSConn.Close(); mdnsCloseErr != nil {
  220. log.Warnf("Failed to close mDNS: %v", mdnsCloseErr)
  221. }
  222. }
  223. }
  224. startedCtx, startedFn := context.WithCancel(context.Background())
  225. a := &Agent{
  226. chanTask: make(chan task),
  227. chanState: make(chan ConnectionState),
  228. chanCandidate: make(chan Candidate),
  229. chanCandidatePair: make(chan *CandidatePair),
  230. tieBreaker: globalMathRandomGenerator.Uint64(),
  231. lite: config.Lite,
  232. gatheringState: GatheringStateNew,
  233. connectionState: ConnectionStateNew,
  234. localCandidates: make(map[NetworkType][]Candidate),
  235. remoteCandidates: make(map[NetworkType][]Candidate),
  236. urls: config.Urls,
  237. networkTypes: config.NetworkTypes,
  238. onConnected: make(chan struct{}),
  239. buffer: packetio.NewBuffer(),
  240. done: make(chan struct{}),
  241. taskLoopDone: make(chan struct{}),
  242. startedCh: startedCtx.Done(),
  243. startedFn: startedFn,
  244. portmin: config.PortMin,
  245. portmax: config.PortMax,
  246. loggerFactory: loggerFactory,
  247. log: log,
  248. net: config.Net,
  249. proxyDialer: config.ProxyDialer,
  250. mDNSMode: mDNSMode,
  251. mDNSName: mDNSName,
  252. mDNSConn: mDNSConn,
  253. gatherCandidateCancel: func() {},
  254. forceCandidateContact: make(chan bool, 1),
  255. interfaceFilter: config.InterfaceFilter,
  256. insecureSkipVerify: config.InsecureSkipVerify,
  257. }
  258. a.tcpMux = config.TCPMux
  259. if a.tcpMux == nil {
  260. a.tcpMux = newInvalidTCPMux()
  261. }
  262. a.udpMux = config.UDPMux
  263. a.udpMuxSrflx = config.UDPMuxSrflx
  264. if a.net == nil {
  265. a.net = vnet.NewNet(nil)
  266. } else if a.net.IsVirtual() {
  267. a.log.Warn("vnet is enabled")
  268. if a.mDNSMode != MulticastDNSModeDisabled {
  269. a.log.Warn("vnet does not support mDNS yet")
  270. }
  271. }
  272. config.initWithDefaults(a)
  273. // Make sure the buffer doesn't grow indefinitely.
  274. // NOTE: We actually won't get anywhere close to this limit.
  275. // SRTP will constantly read from the endpoint and drop packets if it's full.
  276. a.buffer.SetLimitSize(maxBufferSize)
  277. if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
  278. closeMDNSConn()
  279. return nil, ErrLiteUsingNonHostCandidates
  280. }
  281. if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
  282. closeMDNSConn()
  283. return nil, ErrUselessUrlsProvided
  284. }
  285. if err = config.initExtIPMapping(a); err != nil {
  286. closeMDNSConn()
  287. return nil, err
  288. }
  289. go a.taskLoop()
  290. a.startOnConnectionStateChangeRoutine()
  291. // Restart is also used to initialize the agent for the first time
  292. if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
  293. closeMDNSConn()
  294. _ = a.Close()
  295. return nil, err
  296. }
  297. return a, nil
  298. }
  299. // OnConnectionStateChange sets a handler that is fired when the connection state changes
  300. func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
  301. a.onConnectionStateChangeHdlr.Store(f)
  302. return nil
  303. }
  304. // OnSelectedCandidatePairChange sets a handler that is fired when the final candidate
  305. // pair is selected
  306. func (a *Agent) OnSelectedCandidatePairChange(f func(Candidate, Candidate)) error {
  307. a.onSelectedCandidatePairChangeHdlr.Store(f)
  308. return nil
  309. }
  310. // OnCandidate sets a handler that is fired when new candidates gathered. When
  311. // the gathering process complete the last candidate is nil.
  312. func (a *Agent) OnCandidate(f func(Candidate)) error {
  313. a.onCandidateHdlr.Store(f)
  314. return nil
  315. }
  316. func (a *Agent) onSelectedCandidatePairChange(p *CandidatePair) {
  317. if h, ok := a.onSelectedCandidatePairChangeHdlr.Load().(func(Candidate, Candidate)); ok {
  318. h(p.Local, p.Remote)
  319. }
  320. }
  321. func (a *Agent) onCandidate(c Candidate) {
  322. if onCandidateHdlr, ok := a.onCandidateHdlr.Load().(func(Candidate)); ok {
  323. onCandidateHdlr(c)
  324. }
  325. }
  326. func (a *Agent) onConnectionStateChange(s ConnectionState) {
  327. if hdlr, ok := a.onConnectionStateChangeHdlr.Load().(func(ConnectionState)); ok {
  328. hdlr(s)
  329. }
  330. }
  331. func (a *Agent) startOnConnectionStateChangeRoutine() {
  332. go func() {
  333. for {
  334. // CandidatePair and ConnectionState are usually changed at once.
  335. // Blocking one by the other one causes deadlock.
  336. p, isOpen := <-a.chanCandidatePair
  337. if !isOpen {
  338. return
  339. }
  340. a.onSelectedCandidatePairChange(p)
  341. }
  342. }()
  343. go func() {
  344. for {
  345. select {
  346. case s, isOpen := <-a.chanState:
  347. if !isOpen {
  348. for c := range a.chanCandidate {
  349. a.onCandidate(c)
  350. }
  351. return
  352. }
  353. go a.onConnectionStateChange(s)
  354. case c, isOpen := <-a.chanCandidate:
  355. if !isOpen {
  356. for s := range a.chanState {
  357. go a.onConnectionStateChange(s)
  358. }
  359. return
  360. }
  361. a.onCandidate(c)
  362. }
  363. }
  364. }()
  365. }
  366. func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
  367. a.muHaveStarted.Lock()
  368. defer a.muHaveStarted.Unlock()
  369. select {
  370. case <-a.startedCh:
  371. return ErrMultipleStart
  372. default:
  373. }
  374. if err := a.SetRemoteCredentials(remoteUfrag, remotePwd); err != nil {
  375. return err
  376. }
  377. a.log.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
  378. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  379. agent.isControlling = isControlling
  380. agent.remoteUfrag = remoteUfrag
  381. agent.remotePwd = remotePwd
  382. if isControlling {
  383. a.selector = &controllingSelector{agent: a, log: a.log}
  384. } else {
  385. a.selector = &controlledSelector{agent: a, log: a.log}
  386. }
  387. if a.lite {
  388. a.selector = &liteSelector{pairCandidateSelector: a.selector}
  389. }
  390. a.selector.Start()
  391. a.startedFn()
  392. agent.updateConnectionState(ConnectionStateChecking)
  393. a.requestConnectivityCheck()
  394. go a.connectivityChecks()
  395. })
  396. }
  397. func (a *Agent) connectivityChecks() {
  398. lastConnectionState := ConnectionState(0)
  399. checkingDuration := time.Time{}
  400. contact := func() {
  401. if err := a.run(a.context(), func(ctx context.Context, a *Agent) {
  402. defer func() {
  403. lastConnectionState = a.connectionState
  404. }()
  405. switch a.connectionState {
  406. case ConnectionStateFailed:
  407. // The connection is currently failed so don't send any checks
  408. // In the future it may be restarted though
  409. return
  410. case ConnectionStateChecking:
  411. // We have just entered checking for the first time so update our checking timer
  412. if lastConnectionState != a.connectionState {
  413. checkingDuration = time.Now()
  414. }
  415. // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
  416. if time.Since(checkingDuration) > a.disconnectedTimeout+a.failedTimeout {
  417. a.updateConnectionState(ConnectionStateFailed)
  418. return
  419. }
  420. }
  421. a.selector.ContactCandidates()
  422. }); err != nil {
  423. a.log.Warnf("taskLoop failed: %v", err)
  424. }
  425. }
  426. for {
  427. interval := defaultKeepaliveInterval
  428. updateInterval := func(x time.Duration) {
  429. if x != 0 && (interval == 0 || interval > x) {
  430. interval = x
  431. }
  432. }
  433. switch lastConnectionState {
  434. case ConnectionStateNew, ConnectionStateChecking: // While connecting, check candidates more frequently
  435. updateInterval(a.checkInterval)
  436. case ConnectionStateConnected, ConnectionStateDisconnected:
  437. updateInterval(a.keepaliveInterval)
  438. default:
  439. }
  440. // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
  441. updateInterval(a.disconnectedTimeout)
  442. updateInterval(a.failedTimeout)
  443. t := time.NewTimer(interval)
  444. select {
  445. case <-a.forceCandidateContact:
  446. t.Stop()
  447. contact()
  448. case <-t.C:
  449. contact()
  450. case <-a.done:
  451. t.Stop()
  452. return
  453. }
  454. }
  455. }
  456. func (a *Agent) updateConnectionState(newState ConnectionState) {
  457. if a.connectionState != newState {
  458. // Connection has gone to failed, release all gathered candidates
  459. if newState == ConnectionStateFailed {
  460. a.deleteAllCandidates()
  461. }
  462. a.log.Infof("Setting new connection state: %s", newState)
  463. a.connectionState = newState
  464. // Call handler after finishing current task since we may be holding the agent lock
  465. // and the handler may also require it
  466. a.afterRun(func(ctx context.Context) {
  467. a.chanState <- newState
  468. })
  469. }
  470. }
  471. func (a *Agent) setSelectedPair(p *CandidatePair) {
  472. a.log.Tracef("Set selected candidate pair: %s", p)
  473. if p == nil {
  474. var nilPair *CandidatePair
  475. a.selectedPair.Store(nilPair)
  476. return
  477. }
  478. p.nominated = true
  479. a.selectedPair.Store(p)
  480. a.updateConnectionState(ConnectionStateConnected)
  481. // Notify when the selected pair changes
  482. if p != nil {
  483. a.afterRun(func(ctx context.Context) {
  484. select {
  485. case a.chanCandidatePair <- p:
  486. case <-ctx.Done():
  487. }
  488. })
  489. }
  490. // Signal connected
  491. a.onConnectedOnce.Do(func() { close(a.onConnected) })
  492. }
  493. func (a *Agent) pingAllCandidates() {
  494. a.log.Trace("pinging all candidates")
  495. if len(a.checklist) == 0 {
  496. a.log.Warn("pingAllCandidates called with no candidate pairs. Connection is not possible yet.")
  497. }
  498. for _, p := range a.checklist {
  499. if p.state == CandidatePairStateWaiting {
  500. p.state = CandidatePairStateInProgress
  501. } else if p.state != CandidatePairStateInProgress {
  502. continue
  503. }
  504. if p.bindingRequestCount > a.maxBindingRequests {
  505. a.log.Tracef("max requests reached for pair %s, marking it as failed\n", p)
  506. p.state = CandidatePairStateFailed
  507. } else {
  508. a.selector.PingCandidate(p.Local, p.Remote)
  509. p.bindingRequestCount++
  510. }
  511. }
  512. }
  513. func (a *Agent) getBestAvailableCandidatePair() *CandidatePair {
  514. var best *CandidatePair
  515. for _, p := range a.checklist {
  516. if p.state == CandidatePairStateFailed {
  517. continue
  518. }
  519. if best == nil {
  520. best = p
  521. } else if best.priority() < p.priority() {
  522. best = p
  523. }
  524. }
  525. return best
  526. }
  527. func (a *Agent) getBestValidCandidatePair() *CandidatePair {
  528. var best *CandidatePair
  529. for _, p := range a.checklist {
  530. if p.state != CandidatePairStateSucceeded {
  531. continue
  532. }
  533. if best == nil {
  534. best = p
  535. } else if best.priority() < p.priority() {
  536. best = p
  537. }
  538. }
  539. return best
  540. }
  541. func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
  542. p := newCandidatePair(local, remote, a.isControlling)
  543. a.checklist = append(a.checklist, p)
  544. return p
  545. }
  546. func (a *Agent) findPair(local, remote Candidate) *CandidatePair {
  547. for _, p := range a.checklist {
  548. if p.Local.Equal(local) && p.Remote.Equal(remote) {
  549. return p
  550. }
  551. }
  552. return nil
  553. }
  554. // validateSelectedPair checks if the selected pair is (still) valid
  555. // Note: the caller should hold the agent lock.
  556. func (a *Agent) validateSelectedPair() bool {
  557. selectedPair := a.getSelectedPair()
  558. if selectedPair == nil {
  559. return false
  560. }
  561. disconnectedTime := time.Since(selectedPair.Remote.LastReceived())
  562. // Only allow transitions to failed if a.failedTimeout is non-zero
  563. totalTimeToFailure := a.failedTimeout
  564. if totalTimeToFailure != 0 {
  565. totalTimeToFailure += a.disconnectedTimeout
  566. }
  567. switch {
  568. case totalTimeToFailure != 0 && disconnectedTime > totalTimeToFailure:
  569. a.updateConnectionState(ConnectionStateFailed)
  570. case a.disconnectedTimeout != 0 && disconnectedTime > a.disconnectedTimeout:
  571. a.updateConnectionState(ConnectionStateDisconnected)
  572. default:
  573. a.updateConnectionState(ConnectionStateConnected)
  574. }
  575. return true
  576. }
  577. // checkKeepalive sends STUN Binding Indications to the selected pair
  578. // if no packet has been sent on that pair in the last keepaliveInterval
  579. // Note: the caller should hold the agent lock.
  580. func (a *Agent) checkKeepalive() {
  581. selectedPair := a.getSelectedPair()
  582. if selectedPair == nil {
  583. return
  584. }
  585. if (a.keepaliveInterval != 0) &&
  586. ((time.Since(selectedPair.Local.LastSent()) > a.keepaliveInterval) ||
  587. (time.Since(selectedPair.Remote.LastReceived()) > a.keepaliveInterval)) {
  588. // we use binding request instead of indication to support refresh consent schemas
  589. // see https://tools.ietf.org/html/rfc7675
  590. a.selector.PingCandidate(selectedPair.Local, selectedPair.Remote)
  591. }
  592. }
  593. // AddRemoteCandidate adds a new remote candidate
  594. func (a *Agent) AddRemoteCandidate(c Candidate) error {
  595. if c == nil {
  596. return nil
  597. }
  598. // cannot check for network yet because it might not be applied
  599. // when mDNS hostame is used.
  600. if c.TCPType() == TCPTypeActive {
  601. // TCP Candidates with tcptype active will probe server passive ones, so
  602. // no need to do anything with them.
  603. a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
  604. return nil
  605. }
  606. // If we have a mDNS Candidate lets fully resolve it before adding it locally
  607. if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
  608. if a.mDNSMode == MulticastDNSModeDisabled {
  609. a.log.Warnf("remote mDNS candidate added, but mDNS is disabled: (%s)", c.Address())
  610. return nil
  611. }
  612. hostCandidate, ok := c.(*CandidateHost)
  613. if !ok {
  614. return ErrAddressParseFailed
  615. }
  616. go a.resolveAndAddMulticastCandidate(hostCandidate)
  617. return nil
  618. }
  619. go func() {
  620. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  621. agent.addRemoteCandidate(c)
  622. }); err != nil {
  623. a.log.Warnf("Failed to add remote candidate %s: %v", c.Address(), err)
  624. return
  625. }
  626. }()
  627. return nil
  628. }
  629. func (a *Agent) resolveAndAddMulticastCandidate(c *CandidateHost) {
  630. if a.mDNSConn == nil {
  631. return
  632. }
  633. _, src, err := a.mDNSConn.Query(c.context(), c.Address())
  634. if err != nil {
  635. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  636. return
  637. }
  638. ip, _, _, _ := parseAddr(src) //nolint:dogsled
  639. if ip == nil {
  640. a.log.Warnf("Failed to discover mDNS candidate %s: failed to parse IP", c.Address())
  641. return
  642. }
  643. if err = c.setIP(ip); err != nil {
  644. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  645. return
  646. }
  647. if err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  648. agent.addRemoteCandidate(c)
  649. }); err != nil {
  650. a.log.Warnf("Failed to add mDNS candidate %s: %v", c.Address(), err)
  651. return
  652. }
  653. }
  654. func (a *Agent) requestConnectivityCheck() {
  655. select {
  656. case a.forceCandidateContact <- true:
  657. default:
  658. }
  659. }
  660. // addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
  661. func (a *Agent) addRemoteCandidate(c Candidate) {
  662. set := a.remoteCandidates[c.NetworkType()]
  663. for _, candidate := range set {
  664. if candidate.Equal(c) {
  665. return
  666. }
  667. }
  668. set = append(set, c)
  669. a.remoteCandidates[c.NetworkType()] = set
  670. if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok {
  671. for _, localCandidate := range localCandidates {
  672. a.addPair(localCandidate, c)
  673. }
  674. }
  675. a.requestConnectivityCheck()
  676. }
  677. func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net.PacketConn) error {
  678. return a.run(ctx, func(ctx context.Context, agent *Agent) {
  679. set := a.localCandidates[c.NetworkType()]
  680. for _, candidate := range set {
  681. if candidate.Equal(c) {
  682. a.log.Debugf("Ignore duplicate candidate: %s", c.String())
  683. if err := c.close(); err != nil {
  684. a.log.Warnf("Failed to close duplicate candidate: %v", err)
  685. }
  686. return
  687. }
  688. }
  689. c.start(a, candidateConn, a.startedCh)
  690. set = append(set, c)
  691. a.localCandidates[c.NetworkType()] = set
  692. if remoteCandidates, ok := a.remoteCandidates[c.NetworkType()]; ok {
  693. for _, remoteCandidate := range remoteCandidates {
  694. a.addPair(c, remoteCandidate)
  695. }
  696. }
  697. a.requestConnectivityCheck()
  698. a.chanCandidate <- c
  699. })
  700. }
  701. // GetLocalCandidates returns the local candidates
  702. func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
  703. var res []Candidate
  704. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  705. var candidates []Candidate
  706. for _, set := range agent.localCandidates {
  707. candidates = append(candidates, set...)
  708. }
  709. res = candidates
  710. })
  711. if err != nil {
  712. return nil, err
  713. }
  714. return res, nil
  715. }
  716. // GetLocalUserCredentials returns the local user credentials
  717. func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
  718. valSet := make(chan struct{})
  719. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  720. frag = agent.localUfrag
  721. pwd = agent.localPwd
  722. close(valSet)
  723. })
  724. if err == nil {
  725. <-valSet
  726. }
  727. return
  728. }
  729. // GetRemoteUserCredentials returns the remote user credentials
  730. func (a *Agent) GetRemoteUserCredentials() (frag string, pwd string, err error) {
  731. valSet := make(chan struct{})
  732. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  733. frag = agent.remoteUfrag
  734. pwd = agent.remotePwd
  735. close(valSet)
  736. })
  737. if err == nil {
  738. <-valSet
  739. }
  740. return
  741. }
  742. func (a *Agent) removeUfragFromMux() {
  743. a.tcpMux.RemoveConnByUfrag(a.localUfrag)
  744. if a.udpMux != nil {
  745. a.udpMux.RemoveConnByUfrag(a.localUfrag)
  746. }
  747. if a.udpMuxSrflx != nil {
  748. a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
  749. }
  750. }
  751. // Close cleans up the Agent
  752. func (a *Agent) Close() error {
  753. if err := a.ok(); err != nil {
  754. return err
  755. }
  756. a.afterRun(func(context.Context) {
  757. a.gatherCandidateCancel()
  758. if a.gatherCandidateDone != nil {
  759. <-a.gatherCandidateDone
  760. }
  761. })
  762. a.err.Store(ErrClosed)
  763. a.removeUfragFromMux()
  764. close(a.done)
  765. <-a.taskLoopDone
  766. return nil
  767. }
  768. // Remove all candidates. This closes any listening sockets
  769. // and removes both the local and remote candidate lists.
  770. //
  771. // This is used for restarts, failures and on close
  772. func (a *Agent) deleteAllCandidates() {
  773. for net, cs := range a.localCandidates {
  774. for _, c := range cs {
  775. if err := c.close(); err != nil {
  776. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  777. }
  778. }
  779. delete(a.localCandidates, net)
  780. }
  781. for net, cs := range a.remoteCandidates {
  782. for _, c := range cs {
  783. if err := c.close(); err != nil {
  784. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  785. }
  786. }
  787. delete(a.remoteCandidates, net)
  788. }
  789. }
  790. func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Candidate {
  791. ip, port, _, ok := parseAddr(addr)
  792. if !ok {
  793. a.log.Warnf("Error parsing addr: %s", addr)
  794. return nil
  795. }
  796. set := a.remoteCandidates[networkType]
  797. for _, c := range set {
  798. if c.Address() == ip.String() && c.Port() == port {
  799. return c
  800. }
  801. }
  802. return nil
  803. }
  804. func (a *Agent) sendBindingRequest(m *stun.Message, local, remote Candidate) {
  805. a.log.Tracef("ping STUN from %s to %s\n", local.String(), remote.String())
  806. a.invalidatePendingBindingRequests(time.Now())
  807. a.pendingBindingRequests = append(a.pendingBindingRequests, bindingRequest{
  808. timestamp: time.Now(),
  809. transactionID: m.TransactionID,
  810. destination: remote.addr(),
  811. isUseCandidate: m.Contains(stun.AttrUseCandidate),
  812. })
  813. a.sendSTUN(m, local, remote)
  814. }
  815. func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) {
  816. base := remote
  817. ip, port, _, ok := parseAddr(base.addr())
  818. if !ok {
  819. a.log.Warnf("Error parsing addr: %s", base.addr())
  820. return
  821. }
  822. if out, err := stun.Build(m, stun.BindingSuccess,
  823. &stun.XORMappedAddress{
  824. IP: ip,
  825. Port: port,
  826. },
  827. stun.NewShortTermIntegrity(a.localPwd),
  828. stun.Fingerprint,
  829. ); err != nil {
  830. a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
  831. } else {
  832. a.sendSTUN(out, local, remote)
  833. }
  834. }
  835. /* Removes pending binding requests that are over maxBindingRequestTimeout old
  836. Let HTO be the transaction timeout, which SHOULD be 2*RTT if
  837. RTT is known or 500 ms otherwise.
  838. https://tools.ietf.org/html/rfc8445#appendix-B.1
  839. */
  840. func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) {
  841. initialSize := len(a.pendingBindingRequests)
  842. temp := a.pendingBindingRequests[:0]
  843. for _, bindingRequest := range a.pendingBindingRequests {
  844. if filterTime.Sub(bindingRequest.timestamp) < maxBindingRequestTimeout {
  845. temp = append(temp, bindingRequest)
  846. }
  847. }
  848. a.pendingBindingRequests = temp
  849. if bindRequestsRemoved := initialSize - len(a.pendingBindingRequests); bindRequestsRemoved > 0 {
  850. a.log.Tracef("Discarded %d binding requests because they expired", bindRequestsRemoved)
  851. }
  852. }
  853. // Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination
  854. // If the bindingRequest was valid remove it from our pending cache
  855. func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) {
  856. a.invalidatePendingBindingRequests(time.Now())
  857. for i := range a.pendingBindingRequests {
  858. if a.pendingBindingRequests[i].transactionID == id {
  859. validBindingRequest := a.pendingBindingRequests[i]
  860. a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...)
  861. return true, &validBindingRequest
  862. }
  863. }
  864. return false, nil
  865. }
  866. // handleInbound processes STUN traffic from a remote candidate
  867. func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) { //nolint:gocognit
  868. var err error
  869. if m == nil || local == nil {
  870. return
  871. }
  872. if m.Type.Method != stun.MethodBinding ||
  873. !(m.Type.Class == stun.ClassSuccessResponse ||
  874. m.Type.Class == stun.ClassRequest ||
  875. m.Type.Class == stun.ClassIndication) {
  876. a.log.Tracef("unhandled STUN from %s to %s class(%s) method(%s)", remote, local, m.Type.Class, m.Type.Method)
  877. return
  878. }
  879. if a.isControlling {
  880. if m.Contains(stun.AttrICEControlling) {
  881. a.log.Debug("inbound isControlling && a.isControlling == true")
  882. return
  883. } else if m.Contains(stun.AttrUseCandidate) {
  884. a.log.Debug("useCandidate && a.isControlling == true")
  885. return
  886. }
  887. } else {
  888. if m.Contains(stun.AttrICEControlled) {
  889. a.log.Debug("inbound isControlled && a.isControlling == false")
  890. return
  891. }
  892. }
  893. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  894. if m.Type.Class == stun.ClassSuccessResponse {
  895. if err = assertInboundMessageIntegrity(m, []byte(a.remotePwd)); err != nil {
  896. a.log.Warnf("discard message from (%s), %v", remote, err)
  897. return
  898. }
  899. if remoteCandidate == nil {
  900. a.log.Warnf("discard success message from (%s), no such remote", remote)
  901. return
  902. }
  903. a.selector.HandleSuccessResponse(m, local, remoteCandidate, remote)
  904. } else if m.Type.Class == stun.ClassRequest {
  905. if err = assertInboundUsername(m, a.localUfrag+":"+a.remoteUfrag); err != nil {
  906. a.log.Warnf("discard message from (%s), %v", remote, err)
  907. return
  908. } else if err = assertInboundMessageIntegrity(m, []byte(a.localPwd)); err != nil {
  909. a.log.Warnf("discard message from (%s), %v", remote, err)
  910. return
  911. }
  912. if remoteCandidate == nil {
  913. ip, port, networkType, ok := parseAddr(remote)
  914. if !ok {
  915. a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
  916. return
  917. }
  918. prflxCandidateConfig := CandidatePeerReflexiveConfig{
  919. Network: networkType.String(),
  920. Address: ip.String(),
  921. Port: port,
  922. Component: local.Component(),
  923. RelAddr: "",
  924. RelPort: 0,
  925. }
  926. prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
  927. if err != nil {
  928. a.log.Errorf("Failed to create new remote prflx candidate (%s)", err)
  929. return
  930. }
  931. remoteCandidate = prflxCandidate
  932. a.log.Debugf("adding a new peer-reflexive candidate: %s ", remote)
  933. a.addRemoteCandidate(remoteCandidate)
  934. }
  935. a.log.Tracef("inbound STUN (Request) from %s to %s", remote.String(), local.String())
  936. a.selector.HandleBindingRequest(m, local, remoteCandidate)
  937. }
  938. if remoteCandidate != nil {
  939. remoteCandidate.seen(false)
  940. }
  941. }
  942. // validateNonSTUNTraffic processes non STUN traffic from a remote candidate,
  943. // and returns true if it is an actual remote candidate
  944. func (a *Agent) validateNonSTUNTraffic(local Candidate, remote net.Addr) bool {
  945. var isValidCandidate uint64
  946. if err := a.run(local.context(), func(ctx context.Context, agent *Agent) {
  947. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  948. if remoteCandidate != nil {
  949. remoteCandidate.seen(false)
  950. atomic.AddUint64(&isValidCandidate, 1)
  951. }
  952. }); err != nil {
  953. a.log.Warnf("failed to validate remote candidate: %v", err)
  954. }
  955. return atomic.LoadUint64(&isValidCandidate) == 1
  956. }
  957. // GetSelectedCandidatePair returns the selected pair or nil if there is none
  958. func (a *Agent) GetSelectedCandidatePair() (*CandidatePair, error) {
  959. selectedPair := a.getSelectedPair()
  960. if selectedPair == nil {
  961. return nil, nil
  962. }
  963. local, err := selectedPair.Local.copy()
  964. if err != nil {
  965. return nil, err
  966. }
  967. remote, err := selectedPair.Remote.copy()
  968. if err != nil {
  969. return nil, err
  970. }
  971. return &CandidatePair{Local: local, Remote: remote}, nil
  972. }
  973. func (a *Agent) getSelectedPair() *CandidatePair {
  974. selectedPair := a.selectedPair.Load()
  975. if selectedPair == nil {
  976. return nil
  977. }
  978. return selectedPair.(*CandidatePair)
  979. }
  980. func (a *Agent) closeMulticastConn() {
  981. if a.mDNSConn != nil {
  982. if err := a.mDNSConn.Close(); err != nil {
  983. a.log.Warnf("failed to close mDNS Conn: %v", err)
  984. }
  985. }
  986. }
  987. // SetRemoteCredentials sets the credentials of the remote agent
  988. func (a *Agent) SetRemoteCredentials(remoteUfrag, remotePwd string) error {
  989. switch {
  990. case remoteUfrag == "":
  991. return ErrRemoteUfragEmpty
  992. case remotePwd == "":
  993. return ErrRemotePwdEmpty
  994. }
  995. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  996. agent.remoteUfrag = remoteUfrag
  997. agent.remotePwd = remotePwd
  998. })
  999. }
  1000. // Restart restarts the ICE Agent with the provided ufrag/pwd
  1001. // If no ufrag/pwd is provided the Agent will generate one itself
  1002. //
  1003. // Restart must only be called when GatheringState is GatheringStateComplete
  1004. // a user must then call GatherCandidates explicitly to start generating new ones
  1005. func (a *Agent) Restart(ufrag, pwd string) error {
  1006. if ufrag == "" {
  1007. var err error
  1008. ufrag, err = generateUFrag()
  1009. if err != nil {
  1010. return err
  1011. }
  1012. }
  1013. if pwd == "" {
  1014. var err error
  1015. pwd, err = generatePwd()
  1016. if err != nil {
  1017. return err
  1018. }
  1019. }
  1020. if len([]rune(ufrag))*8 < 24 {
  1021. return ErrLocalUfragInsufficientBits
  1022. }
  1023. if len([]rune(pwd))*8 < 128 {
  1024. return ErrLocalPwdInsufficientBits
  1025. }
  1026. var err error
  1027. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1028. if agent.gatheringState == GatheringStateGathering {
  1029. err = ErrRestartWhenGathering
  1030. return
  1031. }
  1032. // Clear all agent needed to take back to fresh state
  1033. a.removeUfragFromMux()
  1034. agent.localUfrag = ufrag
  1035. agent.localPwd = pwd
  1036. agent.remoteUfrag = ""
  1037. agent.remotePwd = ""
  1038. a.gatheringState = GatheringStateNew
  1039. a.checklist = make([]*CandidatePair, 0)
  1040. a.pendingBindingRequests = make([]bindingRequest, 0)
  1041. a.setSelectedPair(nil)
  1042. a.deleteAllCandidates()
  1043. if a.selector != nil {
  1044. a.selector.Start()
  1045. }
  1046. // Restart is used by NewAgent. Accept/Connect should be used to move to checking
  1047. // for new Agents
  1048. if a.connectionState != ConnectionStateNew {
  1049. a.updateConnectionState(ConnectionStateChecking)
  1050. }
  1051. }); runErr != nil {
  1052. return runErr
  1053. }
  1054. return err
  1055. }
  1056. func (a *Agent) setGatheringState(newState GatheringState) error {
  1057. done := make(chan struct{})
  1058. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1059. if a.gatheringState != newState && newState == GatheringStateComplete {
  1060. a.chanCandidate <- nil
  1061. }
  1062. a.gatheringState = newState
  1063. close(done)
  1064. }); err != nil {
  1065. return err
  1066. }
  1067. <-done
  1068. return nil
  1069. }