gather.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package ice
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "github.com/pion/dtls/v2"
  12. "github.com/pion/logging"
  13. "github.com/pion/turn/v2"
  14. )
  15. const (
  16. stunGatherTimeout = time.Second * 5
  17. )
  18. type closeable interface {
  19. Close() error
  20. }
  21. // Close a net.Conn and log if we have a failure
  22. func closeConnAndLog(c closeable, log logging.LeveledLogger, msg string) {
  23. if c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
  24. log.Warnf("Conn is not allocated (%s)", msg)
  25. return
  26. }
  27. log.Warnf(msg)
  28. if err := c.Close(); err != nil {
  29. log.Warnf("Failed to close conn: %v", err)
  30. }
  31. }
  32. // fakePacketConn wraps a net.Conn and emulates net.PacketConn
  33. type fakePacketConn struct {
  34. nextConn net.Conn
  35. }
  36. func (f *fakePacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  37. n, err = f.nextConn.Read(p)
  38. addr = f.nextConn.RemoteAddr()
  39. return
  40. }
  41. func (f *fakePacketConn) Close() error { return f.nextConn.Close() }
  42. func (f *fakePacketConn) LocalAddr() net.Addr { return f.nextConn.LocalAddr() }
  43. func (f *fakePacketConn) SetDeadline(t time.Time) error { return f.nextConn.SetDeadline(t) }
  44. func (f *fakePacketConn) SetReadDeadline(t time.Time) error { return f.nextConn.SetReadDeadline(t) }
  45. func (f *fakePacketConn) SetWriteDeadline(t time.Time) error { return f.nextConn.SetWriteDeadline(t) }
  46. func (f *fakePacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
  47. return f.nextConn.Write(p)
  48. }
  49. // GatherCandidates initiates the trickle based gathering process.
  50. func (a *Agent) GatherCandidates() error {
  51. var gatherErr error
  52. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  53. if a.gatheringState != GatheringStateNew {
  54. gatherErr = ErrMultipleGatherAttempted
  55. return
  56. } else if a.onCandidateHdlr.Load() == nil {
  57. gatherErr = ErrNoOnCandidateHandler
  58. return
  59. }
  60. a.gatherCandidateCancel() // Cancel previous gathering routine
  61. ctx, cancel := context.WithCancel(ctx)
  62. a.gatherCandidateCancel = cancel
  63. a.gatherCandidateDone = make(chan struct{})
  64. go a.gatherCandidates(ctx)
  65. }); runErr != nil {
  66. return runErr
  67. }
  68. return gatherErr
  69. }
  70. func (a *Agent) gatherCandidates(ctx context.Context) {
  71. defer close(a.gatherCandidateDone)
  72. if err := a.setGatheringState(GatheringStateGathering); err != nil {
  73. a.log.Warnf("failed to set gatheringState to GatheringStateGathering: %v", err)
  74. return
  75. }
  76. var wg sync.WaitGroup
  77. for _, t := range a.candidateTypes {
  78. switch t {
  79. case CandidateTypeHost:
  80. wg.Add(1)
  81. go func() {
  82. a.gatherCandidatesLocal(ctx, a.networkTypes)
  83. wg.Done()
  84. }()
  85. case CandidateTypeServerReflexive:
  86. wg.Add(1)
  87. go func() {
  88. if a.udpMuxSrflx != nil {
  89. a.gatherCandidatesSrflxUDPMux(ctx, a.urls, a.networkTypes)
  90. } else {
  91. a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
  92. }
  93. wg.Done()
  94. }()
  95. if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeServerReflexive {
  96. wg.Add(1)
  97. go func() {
  98. a.gatherCandidatesSrflxMapped(ctx, a.networkTypes)
  99. wg.Done()
  100. }()
  101. }
  102. case CandidateTypeRelay:
  103. wg.Add(1)
  104. go func() {
  105. a.gatherCandidatesRelay(ctx, a.urls)
  106. wg.Done()
  107. }()
  108. case CandidateTypePeerReflexive, CandidateTypeUnspecified:
  109. }
  110. }
  111. // Block until all STUN and TURN URLs have been gathered (or timed out)
  112. wg.Wait()
  113. if err := a.setGatheringState(GatheringStateComplete); err != nil {
  114. a.log.Warnf("failed to set gatheringState to GatheringStateComplete: %v", err)
  115. }
  116. }
  117. func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []NetworkType) { //nolint:gocognit
  118. networks := map[string]struct{}{}
  119. for _, networkType := range networkTypes {
  120. if networkType.IsTCP() {
  121. networks[tcp] = struct{}{}
  122. } else {
  123. networks[udp] = struct{}{}
  124. }
  125. }
  126. // when UDPMux is enabled, skip other UDP candidates
  127. if a.udpMux != nil {
  128. if err := a.gatherCandidatesLocalUDPMux(ctx); err != nil {
  129. a.log.Warnf("could not create host candidate for UDPMux")
  130. }
  131. delete(networks, udp)
  132. }
  133. localIPs, err := localInterfaces(a.net, a.interfaceFilter, networkTypes)
  134. if err != nil {
  135. a.log.Warnf("failed to iterate local interfaces, host candidates will not be gathered %s", err)
  136. return
  137. }
  138. for _, ip := range localIPs {
  139. mappedIP := ip
  140. if a.mDNSMode != MulticastDNSModeQueryAndGather && a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeHost {
  141. if _mappedIP, err := a.extIPMapper.findExternalIP(ip.String()); err == nil {
  142. mappedIP = _mappedIP
  143. } else {
  144. a.log.Warnf("1:1 NAT mapping is enabled but no external IP is found for %s\n", ip.String())
  145. }
  146. }
  147. address := mappedIP.String()
  148. if a.mDNSMode == MulticastDNSModeQueryAndGather {
  149. address = a.mDNSName
  150. }
  151. for network := range networks {
  152. var port int
  153. var conn net.PacketConn
  154. var err error
  155. var tcpType TCPType
  156. switch network {
  157. case tcp:
  158. // Handle ICE TCP passive mode
  159. a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
  160. conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag, mappedIP.To4() == nil)
  161. if err != nil {
  162. if !errors.Is(err, ErrTCPMuxNotInitialized) {
  163. a.log.Warnf("error getting tcp conn by ufrag: %s %s %s\n", network, ip, a.localUfrag)
  164. }
  165. continue
  166. }
  167. port = conn.LocalAddr().(*net.TCPAddr).Port
  168. tcpType = TCPTypePassive
  169. // is there a way to verify that the listen address is even
  170. // accessible from the current interface.
  171. case udp:
  172. conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: ip, Port: 0})
  173. if err != nil {
  174. a.log.Warnf("could not listen %s %s\n", network, ip)
  175. continue
  176. }
  177. port = conn.LocalAddr().(*net.UDPAddr).Port
  178. }
  179. hostConfig := CandidateHostConfig{
  180. Network: network,
  181. Address: address,
  182. Port: port,
  183. Component: ComponentRTP,
  184. TCPType: tcpType,
  185. }
  186. c, err := NewCandidateHost(&hostConfig)
  187. if err != nil {
  188. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
  189. continue
  190. }
  191. if a.mDNSMode == MulticastDNSModeQueryAndGather {
  192. if err = c.setIP(ip); err != nil {
  193. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
  194. continue
  195. }
  196. }
  197. if err := a.addCandidate(ctx, c, conn); err != nil {
  198. if closeErr := c.close(); closeErr != nil {
  199. a.log.Warnf("Failed to close candidate: %v", closeErr)
  200. }
  201. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
  202. }
  203. }
  204. }
  205. }
  206. func (a *Agent) gatherCandidatesLocalUDPMux(ctx context.Context) error {
  207. if a.udpMux == nil {
  208. return errUDPMuxDisabled
  209. }
  210. localIPs, err := localInterfaces(a.net, a.interfaceFilter, a.networkTypes)
  211. switch {
  212. case err != nil:
  213. return err
  214. case len(localIPs) == 0:
  215. return errCandidateIPNotFound
  216. }
  217. for _, candidateIP := range localIPs {
  218. if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeHost {
  219. if mappedIP, err := a.extIPMapper.findExternalIP(candidateIP.String()); err != nil {
  220. a.log.Warnf("1:1 NAT mapping is enabled but no external IP is found for %s", candidateIP.String())
  221. continue
  222. } else {
  223. candidateIP = mappedIP
  224. }
  225. }
  226. conn, err := a.udpMux.GetConn(a.localUfrag, candidateIP.To4() == nil)
  227. if err != nil {
  228. return err
  229. }
  230. port := conn.LocalAddr().(*net.UDPAddr).Port
  231. hostConfig := CandidateHostConfig{
  232. Network: udp,
  233. Address: candidateIP.String(),
  234. Port: port,
  235. Component: ComponentRTP,
  236. }
  237. c, err := NewCandidateHost(&hostConfig)
  238. if err != nil {
  239. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host mux candidate: %s %d: %v\n", candidateIP, port, err))
  240. // already logged error
  241. return nil
  242. }
  243. if err := a.addCandidate(ctx, c, conn); err != nil {
  244. if closeErr := c.close(); closeErr != nil {
  245. a.log.Warnf("Failed to close candidate: %v", closeErr)
  246. }
  247. return err
  248. }
  249. }
  250. return nil
  251. }
  252. func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []NetworkType) {
  253. var wg sync.WaitGroup
  254. defer wg.Wait()
  255. for _, networkType := range networkTypes {
  256. if networkType.IsTCP() {
  257. continue
  258. }
  259. network := networkType.String()
  260. wg.Add(1)
  261. go func() {
  262. defer wg.Done()
  263. conn, err := listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
  264. if err != nil {
  265. a.log.Warnf("Failed to listen %s: %v\n", network, err)
  266. return
  267. }
  268. laddr := conn.LocalAddr().(*net.UDPAddr)
  269. mappedIP, err := a.extIPMapper.findExternalIP(laddr.IP.String())
  270. if err != nil {
  271. closeConnAndLog(conn, a.log, fmt.Sprintf("1:1 NAT mapping is enabled but no external IP is found for %s\n", laddr.IP.String()))
  272. return
  273. }
  274. srflxConfig := CandidateServerReflexiveConfig{
  275. Network: network,
  276. Address: mappedIP.String(),
  277. Port: laddr.Port,
  278. Component: ComponentRTP,
  279. RelAddr: laddr.IP.String(),
  280. RelPort: laddr.Port,
  281. }
  282. c, err := NewCandidateServerReflexive(&srflxConfig)
  283. if err != nil {
  284. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n",
  285. network,
  286. mappedIP.String(),
  287. laddr.Port,
  288. err))
  289. return
  290. }
  291. if err := a.addCandidate(ctx, c, conn); err != nil {
  292. if closeErr := c.close(); closeErr != nil {
  293. a.log.Warnf("Failed to close candidate: %v", closeErr)
  294. }
  295. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
  296. }
  297. }()
  298. }
  299. }
  300. func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, networkTypes []NetworkType) {
  301. var wg sync.WaitGroup
  302. defer wg.Wait()
  303. for _, networkType := range networkTypes {
  304. if networkType.IsTCP() {
  305. continue
  306. }
  307. for i := range urls {
  308. wg.Add(1)
  309. go func(url URL, network string, isIPv6 bool) {
  310. defer wg.Done()
  311. hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
  312. serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
  313. if err != nil {
  314. a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err)
  315. return
  316. }
  317. xoraddr, err := a.udpMuxSrflx.GetXORMappedAddr(serverAddr, stunGatherTimeout)
  318. if err != nil {
  319. a.log.Warnf("could not get server reflexive address %s %s: %v\n", network, url, err)
  320. return
  321. }
  322. conn, err := a.udpMuxSrflx.GetConnForURL(a.localUfrag, url.String(), isIPv6)
  323. if err != nil {
  324. a.log.Warnf("could not find connection in UDPMuxSrflx %s %s: %v\n", network, url, err)
  325. return
  326. }
  327. ip := xoraddr.IP
  328. port := xoraddr.Port
  329. laddr := conn.LocalAddr().(*net.UDPAddr)
  330. srflxConfig := CandidateServerReflexiveConfig{
  331. Network: network,
  332. Address: ip.String(),
  333. Port: port,
  334. Component: ComponentRTP,
  335. RelAddr: laddr.IP.String(),
  336. RelPort: laddr.Port,
  337. }
  338. c, err := NewCandidateServerReflexive(&srflxConfig)
  339. if err != nil {
  340. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
  341. return
  342. }
  343. if err := a.addCandidate(ctx, c, conn); err != nil {
  344. if closeErr := c.close(); closeErr != nil {
  345. a.log.Warnf("Failed to close candidate: %v", closeErr)
  346. }
  347. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
  348. }
  349. }(*urls[i], networkType.String(), networkType.IsIPv6())
  350. }
  351. }
  352. }
  353. func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkTypes []NetworkType) { //nolint:gocognit
  354. var wg sync.WaitGroup
  355. defer wg.Wait()
  356. for _, networkType := range networkTypes {
  357. if networkType.IsTCP() {
  358. continue
  359. }
  360. for i := range urls {
  361. wg.Add(1)
  362. go func(url URL, network string) {
  363. defer wg.Done()
  364. hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
  365. serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
  366. if err != nil {
  367. a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err)
  368. return
  369. }
  370. conn, err := listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
  371. if err != nil {
  372. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to listen for %s: %v\n", serverAddr.String(), err))
  373. return
  374. }
  375. // If the agent closes midway through the connection
  376. // we end it early to prevent close delay.
  377. cancelCtx, cancelFunc := context.WithCancel(ctx)
  378. defer cancelFunc()
  379. go func() {
  380. select {
  381. case <-cancelCtx.Done():
  382. return
  383. case <-a.done:
  384. _ = conn.Close()
  385. }
  386. }()
  387. xoraddr, err := getXORMappedAddr(conn, serverAddr, stunGatherTimeout)
  388. if err != nil {
  389. closeConnAndLog(conn, a.log, fmt.Sprintf("could not get server reflexive address %s %s: %v\n", network, url, err))
  390. return
  391. }
  392. ip := xoraddr.IP
  393. port := xoraddr.Port
  394. laddr := conn.LocalAddr().(*net.UDPAddr)
  395. srflxConfig := CandidateServerReflexiveConfig{
  396. Network: network,
  397. Address: ip.String(),
  398. Port: port,
  399. Component: ComponentRTP,
  400. RelAddr: laddr.IP.String(),
  401. RelPort: laddr.Port,
  402. }
  403. c, err := NewCandidateServerReflexive(&srflxConfig)
  404. if err != nil {
  405. closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
  406. return
  407. }
  408. if err := a.addCandidate(ctx, c, conn); err != nil {
  409. if closeErr := c.close(); closeErr != nil {
  410. a.log.Warnf("Failed to close candidate: %v", closeErr)
  411. }
  412. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
  413. }
  414. }(*urls[i], networkType.String())
  415. }
  416. }
  417. }
  418. func (a *Agent) gatherCandidatesRelay(ctx context.Context, urls []*URL) { //nolint:gocognit
  419. var wg sync.WaitGroup
  420. defer wg.Wait()
  421. network := NetworkTypeUDP4.String()
  422. for i := range urls {
  423. switch {
  424. case urls[i].Scheme != SchemeTypeTURN && urls[i].Scheme != SchemeTypeTURNS:
  425. continue
  426. case urls[i].Username == "":
  427. a.log.Errorf("Failed to gather relay candidates: %v", ErrUsernameEmpty)
  428. return
  429. case urls[i].Password == "":
  430. a.log.Errorf("Failed to gather relay candidates: %v", ErrPasswordEmpty)
  431. return
  432. }
  433. wg.Add(1)
  434. go func(url URL) {
  435. defer wg.Done()
  436. TURNServerAddr := fmt.Sprintf("%s:%d", url.Host, url.Port)
  437. var (
  438. locConn net.PacketConn
  439. err error
  440. RelAddr string
  441. RelPort int
  442. relayProtocol string
  443. )
  444. switch {
  445. case url.Proto == ProtoTypeUDP && url.Scheme == SchemeTypeTURN:
  446. if locConn, err = a.net.ListenPacket(network, "0.0.0.0:0"); err != nil {
  447. a.log.Warnf("Failed to listen %s: %v\n", network, err)
  448. return
  449. }
  450. RelAddr = locConn.LocalAddr().(*net.UDPAddr).IP.String()
  451. RelPort = locConn.LocalAddr().(*net.UDPAddr).Port
  452. relayProtocol = udp
  453. case a.proxyDialer != nil && url.Proto == ProtoTypeTCP &&
  454. (url.Scheme == SchemeTypeTURN || url.Scheme == SchemeTypeTURNS):
  455. conn, connectErr := a.proxyDialer.Dial(NetworkTypeTCP4.String(), TURNServerAddr)
  456. if connectErr != nil {
  457. a.log.Warnf("Failed to Dial TCP Addr %s via proxy dialer: %v\n", TURNServerAddr, connectErr)
  458. return
  459. }
  460. RelAddr = conn.LocalAddr().(*net.TCPAddr).IP.String()
  461. RelPort = conn.LocalAddr().(*net.TCPAddr).Port
  462. if url.Scheme == SchemeTypeTURN {
  463. relayProtocol = tcp
  464. } else if url.Scheme == SchemeTypeTURNS {
  465. relayProtocol = "tls"
  466. }
  467. locConn = turn.NewSTUNConn(conn)
  468. case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURN:
  469. tcpAddr, connectErr := net.ResolveTCPAddr(NetworkTypeTCP4.String(), TURNServerAddr)
  470. if connectErr != nil {
  471. a.log.Warnf("Failed to resolve TCP Addr %s: %v\n", TURNServerAddr, connectErr)
  472. return
  473. }
  474. conn, connectErr := net.DialTCP(NetworkTypeTCP4.String(), nil, tcpAddr)
  475. if connectErr != nil {
  476. a.log.Warnf("Failed to Dial TCP Addr %s: %v\n", TURNServerAddr, connectErr)
  477. return
  478. }
  479. RelAddr = conn.LocalAddr().(*net.TCPAddr).IP.String()
  480. RelPort = conn.LocalAddr().(*net.TCPAddr).Port
  481. relayProtocol = tcp
  482. locConn = turn.NewSTUNConn(conn)
  483. case url.Proto == ProtoTypeUDP && url.Scheme == SchemeTypeTURNS:
  484. udpAddr, connectErr := net.ResolveUDPAddr(network, TURNServerAddr)
  485. if connectErr != nil {
  486. a.log.Warnf("Failed to resolve UDP Addr %s: %v\n", TURNServerAddr, connectErr)
  487. return
  488. }
  489. conn, connectErr := dtls.Dial(network, udpAddr, &dtls.Config{
  490. ServerName: url.Host,
  491. InsecureSkipVerify: a.insecureSkipVerify, //nolint:gosec
  492. })
  493. if connectErr != nil {
  494. a.log.Warnf("Failed to Dial DTLS Addr %s: %v\n", TURNServerAddr, connectErr)
  495. return
  496. }
  497. RelAddr = conn.LocalAddr().(*net.UDPAddr).IP.String()
  498. RelPort = conn.LocalAddr().(*net.UDPAddr).Port
  499. relayProtocol = "dtls"
  500. locConn = &fakePacketConn{conn}
  501. case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURNS:
  502. conn, connectErr := tls.Dial(NetworkTypeTCP4.String(), TURNServerAddr, &tls.Config{
  503. InsecureSkipVerify: a.insecureSkipVerify, //nolint:gosec
  504. })
  505. if connectErr != nil {
  506. a.log.Warnf("Failed to Dial TLS Addr %s: %v\n", TURNServerAddr, connectErr)
  507. return
  508. }
  509. RelAddr = conn.LocalAddr().(*net.TCPAddr).IP.String()
  510. RelPort = conn.LocalAddr().(*net.TCPAddr).Port
  511. relayProtocol = "tls"
  512. locConn = turn.NewSTUNConn(conn)
  513. default:
  514. a.log.Warnf("Unable to handle URL in gatherCandidatesRelay %v\n", url)
  515. return
  516. }
  517. client, err := turn.NewClient(&turn.ClientConfig{
  518. TURNServerAddr: TURNServerAddr,
  519. Conn: locConn,
  520. Username: url.Username,
  521. Password: url.Password,
  522. LoggerFactory: a.loggerFactory,
  523. Net: a.net,
  524. })
  525. if err != nil {
  526. closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to build new turn.Client %s %s\n", TURNServerAddr, err))
  527. return
  528. }
  529. if err = client.Listen(); err != nil {
  530. client.Close()
  531. closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to listen on turn.Client %s %s\n", TURNServerAddr, err))
  532. return
  533. }
  534. relayConn, err := client.Allocate()
  535. if err != nil {
  536. client.Close()
  537. closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to allocate on turn.Client %s %s\n", TURNServerAddr, err))
  538. return
  539. }
  540. raddr := relayConn.LocalAddr().(*net.UDPAddr)
  541. relayConfig := CandidateRelayConfig{
  542. Network: network,
  543. Component: ComponentRTP,
  544. Address: raddr.IP.String(),
  545. Port: raddr.Port,
  546. RelAddr: RelAddr,
  547. RelPort: RelPort,
  548. RelayProtocol: relayProtocol,
  549. OnClose: func() error {
  550. client.Close()
  551. return locConn.Close()
  552. },
  553. }
  554. relayConnClose := func() {
  555. if relayConErr := relayConn.Close(); relayConErr != nil {
  556. a.log.Warnf("Failed to close relay %v", relayConErr)
  557. }
  558. }
  559. candidate, err := NewCandidateRelay(&relayConfig)
  560. if err != nil {
  561. relayConnClose()
  562. client.Close()
  563. closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to create relay candidate: %s %s: %v\n", network, raddr.String(), err))
  564. return
  565. }
  566. if err := a.addCandidate(ctx, candidate, relayConn); err != nil {
  567. relayConnClose()
  568. if closeErr := candidate.close(); closeErr != nil {
  569. a.log.Warnf("Failed to close candidate: %v", closeErr)
  570. }
  571. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
  572. }
  573. }(*urls[i])
  574. }
  575. }