conn.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. // Package client implements the API for a TURN client
  2. package client
  3. import (
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "sync"
  10. "time"
  11. "github.com/pion/logging"
  12. "github.com/pion/stun"
  13. "github.com/pion/turn/v2/internal/proto"
  14. )
  15. const (
  16. maxReadQueueSize = 1024
  17. permRefreshInterval = 120 * time.Second
  18. maxRetryAttempts = 3
  19. )
  20. const (
  21. timerIDRefreshAlloc int = iota
  22. timerIDRefreshPerms
  23. )
  24. func noDeadline() time.Time {
  25. return time.Time{}
  26. }
  27. type inboundData struct {
  28. data []byte
  29. from net.Addr
  30. }
  31. // UDPConnObserver is an interface to UDPConn observer
  32. type UDPConnObserver interface {
  33. TURNServerAddr() net.Addr
  34. Username() stun.Username
  35. Realm() stun.Realm
  36. WriteTo(data []byte, to net.Addr) (int, error)
  37. PerformTransaction(msg *stun.Message, to net.Addr, dontWait bool) (TransactionResult, error)
  38. OnDeallocated(relayedAddr net.Addr)
  39. }
  40. // UDPConnConfig is a set of configuration params use by NewUDPConn
  41. type UDPConnConfig struct {
  42. Observer UDPConnObserver
  43. RelayedAddr net.Addr
  44. Integrity stun.MessageIntegrity
  45. Nonce stun.Nonce
  46. Lifetime time.Duration
  47. Log logging.LeveledLogger
  48. }
  49. // UDPConn is the implementation of the Conn and PacketConn interfaces for UDP network connections.
  50. // comatible with net.PacketConn and net.Conn
  51. type UDPConn struct {
  52. obs UDPConnObserver // read-only
  53. relayedAddr net.Addr // read-only
  54. permMap *permissionMap // thread-safe
  55. bindingMgr *bindingManager // thread-safe
  56. integrity stun.MessageIntegrity // read-only
  57. _nonce stun.Nonce // needs mutex x
  58. _lifetime time.Duration // needs mutex x
  59. readCh chan *inboundData // thread-safe
  60. closeCh chan struct{} // thread-safe
  61. readTimer *time.Timer // thread-safe
  62. refreshAllocTimer *PeriodicTimer // thread-safe
  63. refreshPermsTimer *PeriodicTimer // thread-safe
  64. mutex sync.RWMutex // thread-safe
  65. log logging.LeveledLogger // read-only
  66. }
  67. // NewUDPConn creates a new instance of UDPConn
  68. func NewUDPConn(config *UDPConnConfig) *UDPConn {
  69. c := &UDPConn{
  70. obs: config.Observer,
  71. relayedAddr: config.RelayedAddr,
  72. permMap: newPermissionMap(),
  73. bindingMgr: newBindingManager(),
  74. integrity: config.Integrity,
  75. _nonce: config.Nonce,
  76. _lifetime: config.Lifetime,
  77. readCh: make(chan *inboundData, maxReadQueueSize),
  78. closeCh: make(chan struct{}),
  79. readTimer: time.NewTimer(time.Duration(math.MaxInt64)),
  80. log: config.Log,
  81. }
  82. c.log.Debugf("initial lifetime: %d seconds", int(c.lifetime().Seconds()))
  83. c.refreshAllocTimer = NewPeriodicTimer(
  84. timerIDRefreshAlloc,
  85. c.onRefreshTimers,
  86. c.lifetime()/2,
  87. )
  88. c.refreshPermsTimer = NewPeriodicTimer(
  89. timerIDRefreshPerms,
  90. c.onRefreshTimers,
  91. permRefreshInterval,
  92. )
  93. if c.refreshAllocTimer.Start() {
  94. c.log.Debugf("refreshAllocTimer started")
  95. }
  96. if c.refreshPermsTimer.Start() {
  97. c.log.Debugf("refreshPermsTimer started")
  98. }
  99. return c
  100. }
  101. // ReadFrom reads a packet from the connection,
  102. // copying the payload into p. It returns the number of
  103. // bytes copied into p and the return address that
  104. // was on the packet.
  105. // It returns the number of bytes read (0 <= n <= len(p))
  106. // and any error encountered. Callers should always process
  107. // the n > 0 bytes returned before considering the error err.
  108. // ReadFrom can be made to time out and return
  109. // an Error with Timeout() == true after a fixed time limit;
  110. // see SetDeadline and SetReadDeadline.
  111. func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  112. for {
  113. select {
  114. case ibData := <-c.readCh:
  115. n := copy(p, ibData.data)
  116. if n < len(ibData.data) {
  117. return 0, nil, io.ErrShortBuffer
  118. }
  119. return n, ibData.from, nil
  120. case <-c.readTimer.C:
  121. return 0, nil, &net.OpError{
  122. Op: "read",
  123. Net: c.LocalAddr().Network(),
  124. Addr: c.LocalAddr(),
  125. Err: newTimeoutError("i/o timeout"),
  126. }
  127. case <-c.closeCh:
  128. return 0, nil, &net.OpError{
  129. Op: "read",
  130. Net: c.LocalAddr().Network(),
  131. Addr: c.LocalAddr(),
  132. Err: errClosed,
  133. }
  134. }
  135. }
  136. }
  137. // WriteTo writes a packet with payload p to addr.
  138. // WriteTo can be made to time out and return
  139. // an Error with Timeout() == true after a fixed time limit;
  140. // see SetDeadline and SetWriteDeadline.
  141. // On packet-oriented connections, write timeouts are rare.
  142. func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (int, error) { //nolint: gocognit
  143. var err error
  144. _, ok := addr.(*net.UDPAddr)
  145. if !ok {
  146. return 0, errUDPAddrCast
  147. }
  148. // check if we have a permission for the destination IP addr
  149. perm, ok := c.permMap.find(addr)
  150. if !ok {
  151. perm = &permission{}
  152. c.permMap.insert(addr, perm)
  153. }
  154. // This func-block would block, per destination IP (, or perm), until
  155. // the perm state becomes "requested". Purpose of this is to guarantee
  156. // the order of packets (within the same perm).
  157. // Note that CreatePermission transaction may not be complete before
  158. // all the data transmission. This is done assuming that the request
  159. // will be mostly likely successful and we can tolerate some loss of
  160. // UDP packet (or reorder), inorder to minimize the latency in most cases.
  161. createPermission := func() error {
  162. perm.mutex.Lock()
  163. defer perm.mutex.Unlock()
  164. if perm.state() == permStateIdle {
  165. // punch a hole! (this would block a bit..)
  166. if err = c.CreatePermissions(addr); err != nil {
  167. c.permMap.delete(addr)
  168. return err
  169. }
  170. perm.setState(permStatePermitted)
  171. }
  172. return nil
  173. }
  174. for i := 0; i < maxRetryAttempts; i++ {
  175. if err = createPermission(); !errors.Is(err, errTryAgain) {
  176. break
  177. }
  178. }
  179. if err != nil {
  180. return 0, err
  181. }
  182. // bind channel
  183. b, ok := c.bindingMgr.findByAddr(addr)
  184. if !ok {
  185. b = c.bindingMgr.create(addr)
  186. }
  187. bindSt := b.state()
  188. if bindSt == bindingStateIdle || bindSt == bindingStateRequest || bindSt == bindingStateFailed {
  189. func() {
  190. // block only callers with the same binding until
  191. // the binding transaction has been complete
  192. b.muBind.Lock()
  193. defer b.muBind.Unlock()
  194. // binding state may have been changed while waiting. check again.
  195. if b.state() == bindingStateIdle {
  196. b.setState(bindingStateRequest)
  197. go func() {
  198. err2 := c.bind(b)
  199. if err2 != nil {
  200. c.log.Warnf("bind() failed: %s", err2.Error())
  201. b.setState(bindingStateFailed)
  202. // keep going...
  203. } else {
  204. b.setState(bindingStateReady)
  205. }
  206. }()
  207. }
  208. }()
  209. // send data using SendIndication
  210. peerAddr := addr2PeerAddress(addr)
  211. var msg *stun.Message
  212. msg, err = stun.Build(
  213. stun.TransactionID,
  214. stun.NewType(stun.MethodSend, stun.ClassIndication),
  215. proto.Data(p),
  216. peerAddr,
  217. stun.Fingerprint,
  218. )
  219. if err != nil {
  220. return 0, err
  221. }
  222. // indication has no transaction (fire-and-forget)
  223. return c.obs.WriteTo(msg.Raw, c.obs.TURNServerAddr())
  224. }
  225. // binding is either ready
  226. // check if the binding needs a refresh
  227. func() {
  228. b.muBind.Lock()
  229. defer b.muBind.Unlock()
  230. if b.state() == bindingStateReady && time.Since(b.refreshedAt()) > 5*time.Minute {
  231. b.setState(bindingStateRefresh)
  232. go func() {
  233. err = c.bind(b)
  234. if err != nil {
  235. c.log.Warnf("bind() for refresh failed: %s", err.Error())
  236. b.setState(bindingStateFailed)
  237. // keep going...
  238. } else {
  239. b.setRefreshedAt(time.Now())
  240. b.setState(bindingStateReady)
  241. }
  242. }()
  243. }
  244. }()
  245. // send via ChannelData
  246. return c.sendChannelData(p, b.number)
  247. }
  248. // Close closes the connection.
  249. // Any blocked ReadFrom or WriteTo operations will be unblocked and return errors.
  250. func (c *UDPConn) Close() error {
  251. c.refreshAllocTimer.Stop()
  252. c.refreshPermsTimer.Stop()
  253. select {
  254. case <-c.closeCh:
  255. return errAlreadyClosed
  256. default:
  257. close(c.closeCh)
  258. }
  259. c.obs.OnDeallocated(c.relayedAddr)
  260. return c.refreshAllocation(0, true /* dontWait=true */)
  261. }
  262. // LocalAddr returns the local network address.
  263. func (c *UDPConn) LocalAddr() net.Addr {
  264. return c.relayedAddr
  265. }
  266. // SetDeadline sets the read and write deadlines associated
  267. // with the connection. It is equivalent to calling both
  268. // SetReadDeadline and SetWriteDeadline.
  269. //
  270. // A deadline is an absolute time after which I/O operations
  271. // fail with a timeout (see type Error) instead of
  272. // blocking. The deadline applies to all future and pending
  273. // I/O, not just the immediately following call to ReadFrom or
  274. // WriteTo. After a deadline has been exceeded, the connection
  275. // can be refreshed by setting a deadline in the future.
  276. //
  277. // An idle timeout can be implemented by repeatedly extending
  278. // the deadline after successful ReadFrom or WriteTo calls.
  279. //
  280. // A zero value for t means I/O operations will not time out.
  281. func (c *UDPConn) SetDeadline(t time.Time) error {
  282. return c.SetReadDeadline(t)
  283. }
  284. // SetReadDeadline sets the deadline for future ReadFrom calls
  285. // and any currently-blocked ReadFrom call.
  286. // A zero value for t means ReadFrom will not time out.
  287. func (c *UDPConn) SetReadDeadline(t time.Time) error {
  288. var d time.Duration
  289. if t == noDeadline() {
  290. d = time.Duration(math.MaxInt64)
  291. } else {
  292. d = time.Until(t)
  293. }
  294. c.readTimer.Reset(d)
  295. return nil
  296. }
  297. // SetWriteDeadline sets the deadline for future WriteTo calls
  298. // and any currently-blocked WriteTo call.
  299. // Even if write times out, it may return n > 0, indicating that
  300. // some of the data was successfully written.
  301. // A zero value for t means WriteTo will not time out.
  302. func (c *UDPConn) SetWriteDeadline(t time.Time) error {
  303. // Write never blocks.
  304. return nil
  305. }
  306. func addr2PeerAddress(addr net.Addr) proto.PeerAddress {
  307. var peerAddr proto.PeerAddress
  308. switch a := addr.(type) {
  309. case *net.UDPAddr:
  310. peerAddr.IP = a.IP
  311. peerAddr.Port = a.Port
  312. case *net.TCPAddr:
  313. peerAddr.IP = a.IP
  314. peerAddr.Port = a.Port
  315. }
  316. return peerAddr
  317. }
  318. // CreatePermissions Issues a CreatePermission request for the supplied addresses
  319. // as described in https://datatracker.ietf.org/doc/html/rfc5766#section-9
  320. func (c *UDPConn) CreatePermissions(addrs ...net.Addr) error {
  321. setters := []stun.Setter{
  322. stun.TransactionID,
  323. stun.NewType(stun.MethodCreatePermission, stun.ClassRequest),
  324. }
  325. for _, addr := range addrs {
  326. setters = append(setters, addr2PeerAddress(addr))
  327. }
  328. setters = append(setters,
  329. c.obs.Username(),
  330. c.obs.Realm(),
  331. c.nonce(),
  332. c.integrity,
  333. stun.Fingerprint)
  334. msg, err := stun.Build(setters...)
  335. if err != nil {
  336. return err
  337. }
  338. trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), false)
  339. if err != nil {
  340. return err
  341. }
  342. res := trRes.Msg
  343. if res.Type.Class == stun.ClassErrorResponse {
  344. var code stun.ErrorCodeAttribute
  345. if err = code.GetFrom(res); err == nil {
  346. if code.Code == stun.CodeStaleNonce {
  347. c.setNonceFromMsg(res)
  348. return errTryAgain
  349. }
  350. return fmt.Errorf("%s (error %s)", res.Type, code) //nolint:goerr113
  351. }
  352. return fmt.Errorf("%s", res.Type) //nolint:goerr113
  353. }
  354. return nil
  355. }
  356. // HandleInbound passes inbound data in UDPConn
  357. func (c *UDPConn) HandleInbound(data []byte, from net.Addr) {
  358. // copy data
  359. copied := make([]byte, len(data))
  360. copy(copied, data)
  361. select {
  362. case c.readCh <- &inboundData{data: copied, from: from}:
  363. default:
  364. c.log.Warnf("receive buffer full")
  365. }
  366. }
  367. // FindAddrByChannelNumber returns a peer address associated with the
  368. // channel number on this UDPConn
  369. func (c *UDPConn) FindAddrByChannelNumber(chNum uint16) (net.Addr, bool) {
  370. b, ok := c.bindingMgr.findByNumber(chNum)
  371. if !ok {
  372. return nil, false
  373. }
  374. return b.addr, true
  375. }
  376. func (c *UDPConn) setNonceFromMsg(msg *stun.Message) {
  377. // Update nonce
  378. var nonce stun.Nonce
  379. if err := nonce.GetFrom(msg); err == nil {
  380. c.setNonce(nonce)
  381. c.log.Debug("refresh allocation: 438, got new nonce.")
  382. } else {
  383. c.log.Warn("refresh allocation: 438 but no nonce.")
  384. }
  385. }
  386. func (c *UDPConn) refreshAllocation(lifetime time.Duration, dontWait bool) error {
  387. msg, err := stun.Build(
  388. stun.TransactionID,
  389. stun.NewType(stun.MethodRefresh, stun.ClassRequest),
  390. proto.Lifetime{Duration: lifetime},
  391. c.obs.Username(),
  392. c.obs.Realm(),
  393. c.nonce(),
  394. c.integrity,
  395. stun.Fingerprint,
  396. )
  397. if err != nil {
  398. return fmt.Errorf("%w: %s", errFailedToBuildRefreshRequest, err.Error())
  399. }
  400. c.log.Debugf("send refresh request (dontWait=%v)", dontWait)
  401. trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), dontWait)
  402. if err != nil {
  403. return fmt.Errorf("%w: %s", errFailedToRefreshAllocation, err.Error())
  404. }
  405. if dontWait {
  406. c.log.Debug("refresh request sent")
  407. return nil
  408. }
  409. c.log.Debug("refresh request sent, and waiting response")
  410. res := trRes.Msg
  411. if res.Type.Class == stun.ClassErrorResponse {
  412. var code stun.ErrorCodeAttribute
  413. if err = code.GetFrom(res); err == nil {
  414. if code.Code == stun.CodeStaleNonce {
  415. c.setNonceFromMsg(res)
  416. return errTryAgain
  417. }
  418. return err
  419. }
  420. return fmt.Errorf("%s", res.Type) //nolint:goerr113
  421. }
  422. // Getting lifetime from response
  423. var updatedLifetime proto.Lifetime
  424. if err := updatedLifetime.GetFrom(res); err != nil {
  425. return fmt.Errorf("%w: %s", errFailedToGetLifetime, err.Error())
  426. }
  427. c.setLifetime(updatedLifetime.Duration)
  428. c.log.Debugf("updated lifetime: %d seconds", int(c.lifetime().Seconds()))
  429. return nil
  430. }
  431. func (c *UDPConn) refreshPermissions() error {
  432. addrs := c.permMap.addrs()
  433. if len(addrs) == 0 {
  434. c.log.Debug("no permission to refresh")
  435. return nil
  436. }
  437. if err := c.CreatePermissions(addrs...); err != nil {
  438. if errors.Is(err, errTryAgain) {
  439. return errTryAgain
  440. }
  441. c.log.Errorf("fail to refresh permissions: %s", err.Error())
  442. return err
  443. }
  444. c.log.Debug("refresh permissions successful")
  445. return nil
  446. }
  447. func (c *UDPConn) bind(b *binding) error {
  448. setters := []stun.Setter{
  449. stun.TransactionID,
  450. stun.NewType(stun.MethodChannelBind, stun.ClassRequest),
  451. addr2PeerAddress(b.addr),
  452. proto.ChannelNumber(b.number),
  453. c.obs.Username(),
  454. c.obs.Realm(),
  455. c.nonce(),
  456. c.integrity,
  457. stun.Fingerprint,
  458. }
  459. msg, err := stun.Build(setters...)
  460. if err != nil {
  461. return err
  462. }
  463. trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), false)
  464. if err != nil {
  465. c.bindingMgr.deleteByAddr(b.addr)
  466. return err
  467. }
  468. res := trRes.Msg
  469. if res.Type != stun.NewType(stun.MethodChannelBind, stun.ClassSuccessResponse) {
  470. return fmt.Errorf("unexpected response type %s", res.Type) //nolint:goerr113
  471. }
  472. c.log.Debugf("channel binding successful: %s %d", b.addr.String(), b.number)
  473. // Success.
  474. return nil
  475. }
  476. func (c *UDPConn) sendChannelData(data []byte, chNum uint16) (int, error) {
  477. chData := &proto.ChannelData{
  478. Data: data,
  479. Number: proto.ChannelNumber(chNum),
  480. }
  481. chData.Encode()
  482. return c.obs.WriteTo(chData.Raw, c.obs.TURNServerAddr())
  483. }
  484. func (c *UDPConn) onRefreshTimers(id int) {
  485. c.log.Debugf("refresh timer %d expired", id)
  486. switch id {
  487. case timerIDRefreshAlloc:
  488. var err error
  489. lifetime := c.lifetime()
  490. // limit the max retries on errTryAgain to 3
  491. // when stale nonce returns, sencond retry should succeed
  492. for i := 0; i < maxRetryAttempts; i++ {
  493. err = c.refreshAllocation(lifetime, false)
  494. if !errors.Is(err, errTryAgain) {
  495. break
  496. }
  497. }
  498. if err != nil {
  499. c.log.Warnf("refresh allocation failed")
  500. }
  501. case timerIDRefreshPerms:
  502. var err error
  503. for i := 0; i < maxRetryAttempts; i++ {
  504. err = c.refreshPermissions()
  505. if !errors.Is(err, errTryAgain) {
  506. break
  507. }
  508. }
  509. if err != nil {
  510. c.log.Warnf("refresh permissions failed")
  511. }
  512. }
  513. }
  514. func (c *UDPConn) nonce() stun.Nonce {
  515. c.mutex.RLock()
  516. defer c.mutex.RUnlock()
  517. return c._nonce
  518. }
  519. func (c *UDPConn) setNonce(nonce stun.Nonce) {
  520. c.mutex.Lock()
  521. defer c.mutex.Unlock()
  522. c.log.Debugf("set new nonce with %d bytes", len(nonce))
  523. c._nonce = nonce
  524. }
  525. func (c *UDPConn) lifetime() time.Duration {
  526. c.mutex.RLock()
  527. defer c.mutex.RUnlock()
  528. return c._lifetime
  529. }
  530. func (c *UDPConn) setLifetime(lifetime time.Duration) {
  531. c.mutex.Lock()
  532. defer c.mutex.Unlock()
  533. c._lifetime = lifetime
  534. }