client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. package turn
  2. import (
  3. b64 "encoding/base64"
  4. "fmt"
  5. "math"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/pion/logging"
  10. "github.com/pion/stun"
  11. "github.com/pion/transport/vnet"
  12. "github.com/pion/turn/v2/internal/client"
  13. "github.com/pion/turn/v2/internal/proto"
  14. )
  15. const (
  16. defaultRTO = 200 * time.Millisecond
  17. maxRtxCount = 7 // total 7 requests (Rc)
  18. maxDataBufferSize = math.MaxUint16 // message size limit for Chromium
  19. )
  20. // interval [msec]
  21. // 0: 0 ms +500
  22. // 1: 500 ms +1000
  23. // 2: 1500 ms +2000
  24. // 3: 3500 ms +4000
  25. // 4: 7500 ms +8000
  26. // 5: 15500 ms +16000
  27. // 6: 31500 ms +32000
  28. // -: 63500 ms failed
  29. // ClientConfig is a bag of config parameters for Client.
  30. type ClientConfig struct {
  31. STUNServerAddr string // STUN server address (e.g. "stun.abc.com:3478")
  32. TURNServerAddr string // TURN server addrees (e.g. "turn.abc.com:3478")
  33. Username string
  34. Password string
  35. Realm string
  36. Software string
  37. RTO time.Duration
  38. Conn net.PacketConn // Listening socket (net.PacketConn)
  39. LoggerFactory logging.LoggerFactory
  40. Net *vnet.Net
  41. }
  42. // Client is a STUN server client
  43. type Client struct {
  44. conn net.PacketConn // read-only
  45. stunServ net.Addr // read-only
  46. turnServ net.Addr // read-only
  47. stunServStr string // read-only, used for dmuxing
  48. turnServStr string // read-only, used for dmuxing
  49. username stun.Username // read-only
  50. password string // read-only
  51. realm stun.Realm // read-only
  52. integrity stun.MessageIntegrity // read-only
  53. software stun.Software // read-only
  54. trMap *client.TransactionMap // thread-safe
  55. rto time.Duration // read-only
  56. relayedConn *client.UDPConn // protected by mutex ***
  57. allocTryLock client.TryLock // thread-safe
  58. listenTryLock client.TryLock // thread-safe
  59. net *vnet.Net // read-only
  60. mutex sync.RWMutex // thread-safe
  61. mutexTrMap sync.Mutex // thread-safe
  62. log logging.LeveledLogger // read-only
  63. }
  64. // NewClient returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0"
  65. func NewClient(config *ClientConfig) (*Client, error) {
  66. loggerFactory := config.LoggerFactory
  67. if loggerFactory == nil {
  68. loggerFactory = logging.NewDefaultLoggerFactory()
  69. }
  70. log := loggerFactory.NewLogger("turnc")
  71. if config.Conn == nil {
  72. return nil, errNilConn
  73. }
  74. if config.Net == nil {
  75. config.Net = vnet.NewNet(nil) // defaults to native operation
  76. } else if config.Net.IsVirtual() {
  77. log.Warn("vnet is enabled")
  78. }
  79. var stunServ, turnServ net.Addr
  80. var stunServStr, turnServStr string
  81. var err error
  82. if len(config.STUNServerAddr) > 0 {
  83. log.Debugf("resolving %s", config.STUNServerAddr)
  84. stunServ, err = config.Net.ResolveUDPAddr("udp4", config.STUNServerAddr)
  85. if err != nil {
  86. return nil, err
  87. }
  88. stunServStr = stunServ.String()
  89. log.Debugf("stunServ: %s", stunServStr)
  90. }
  91. if len(config.TURNServerAddr) > 0 {
  92. log.Debugf("resolving %s", config.TURNServerAddr)
  93. turnServ, err = config.Net.ResolveUDPAddr("udp4", config.TURNServerAddr)
  94. if err != nil {
  95. return nil, err
  96. }
  97. turnServStr = turnServ.String()
  98. log.Debugf("turnServ: %s", turnServStr)
  99. }
  100. rto := defaultRTO
  101. if config.RTO > 0 {
  102. rto = config.RTO
  103. }
  104. c := &Client{
  105. conn: config.Conn,
  106. stunServ: stunServ,
  107. turnServ: turnServ,
  108. stunServStr: stunServStr,
  109. turnServStr: turnServStr,
  110. username: stun.NewUsername(config.Username),
  111. password: config.Password,
  112. realm: stun.NewRealm(config.Realm),
  113. software: stun.NewSoftware(config.Software),
  114. net: config.Net,
  115. trMap: client.NewTransactionMap(),
  116. rto: rto,
  117. log: log,
  118. }
  119. return c, nil
  120. }
  121. // TURNServerAddr return the TURN server address
  122. func (c *Client) TURNServerAddr() net.Addr {
  123. return c.turnServ
  124. }
  125. // STUNServerAddr return the STUN server address
  126. func (c *Client) STUNServerAddr() net.Addr {
  127. return c.stunServ
  128. }
  129. // Username returns username
  130. func (c *Client) Username() stun.Username {
  131. return c.username
  132. }
  133. // Realm return realm
  134. func (c *Client) Realm() stun.Realm {
  135. return c.realm
  136. }
  137. // WriteTo sends data to the specified destination using the base socket.
  138. func (c *Client) WriteTo(data []byte, to net.Addr) (int, error) {
  139. return c.conn.WriteTo(data, to)
  140. }
  141. // Listen will have this client start listening on the conn provided via the config.
  142. // This is optional. If not used, you will need to call HandleInbound method
  143. // to supply incoming data, instead.
  144. func (c *Client) Listen() error {
  145. if err := c.listenTryLock.Lock(); err != nil {
  146. return fmt.Errorf("%w: %s", errAlreadyListening, err.Error())
  147. }
  148. go func() {
  149. buf := make([]byte, maxDataBufferSize)
  150. for {
  151. n, from, err := c.conn.ReadFrom(buf)
  152. if err != nil {
  153. c.log.Debugf("exiting read loop: %s", err.Error())
  154. break
  155. }
  156. _, err = c.HandleInbound(buf[:n], from)
  157. if err != nil {
  158. c.log.Debugf("exiting read loop: %s", err.Error())
  159. break
  160. }
  161. }
  162. c.listenTryLock.Unlock()
  163. }()
  164. return nil
  165. }
  166. // Close closes this client
  167. func (c *Client) Close() {
  168. c.mutexTrMap.Lock()
  169. defer c.mutexTrMap.Unlock()
  170. c.trMap.CloseAndDeleteAll()
  171. }
  172. // TransactionID & Base64: https://play.golang.org/p/EEgmJDI971P
  173. // SendBindingRequestTo sends a new STUN request to the given transport address
  174. func (c *Client) SendBindingRequestTo(to net.Addr) (net.Addr, error) {
  175. attrs := []stun.Setter{stun.TransactionID, stun.BindingRequest}
  176. if len(c.software) > 0 {
  177. attrs = append(attrs, c.software)
  178. }
  179. msg, err := stun.Build(attrs...)
  180. if err != nil {
  181. return nil, err
  182. }
  183. trRes, err := c.PerformTransaction(msg, to, false)
  184. if err != nil {
  185. return nil, err
  186. }
  187. var reflAddr stun.XORMappedAddress
  188. if err := reflAddr.GetFrom(trRes.Msg); err != nil {
  189. return nil, err
  190. }
  191. return &net.UDPAddr{
  192. IP: reflAddr.IP,
  193. Port: reflAddr.Port,
  194. }, nil
  195. }
  196. // SendBindingRequest sends a new STUN request to the STUN server
  197. func (c *Client) SendBindingRequest() (net.Addr, error) {
  198. if c.stunServ == nil {
  199. return nil, errSTUNServerAddressNotSet
  200. }
  201. return c.SendBindingRequestTo(c.stunServ)
  202. }
  203. // Allocate sends a TURN allocation request to the given transport address
  204. func (c *Client) Allocate() (net.PacketConn, error) {
  205. if err := c.allocTryLock.Lock(); err != nil {
  206. return nil, fmt.Errorf("%w: %s", errOneAllocateOnly, err.Error())
  207. }
  208. defer c.allocTryLock.Unlock()
  209. relayedConn := c.relayedUDPConn()
  210. if relayedConn != nil {
  211. return nil, fmt.Errorf("%w: %s", errAlreadyAllocated, relayedConn.LocalAddr().String())
  212. }
  213. msg, err := stun.Build(
  214. stun.TransactionID,
  215. stun.NewType(stun.MethodAllocate, stun.ClassRequest),
  216. proto.RequestedTransport{Protocol: proto.ProtoUDP},
  217. stun.Fingerprint,
  218. )
  219. if err != nil {
  220. return nil, err
  221. }
  222. trRes, err := c.PerformTransaction(msg, c.turnServ, false)
  223. if err != nil {
  224. return nil, err
  225. }
  226. res := trRes.Msg
  227. // Anonymous allocate failed, trying to authenticate.
  228. var nonce stun.Nonce
  229. if err = nonce.GetFrom(res); err != nil {
  230. return nil, err
  231. }
  232. if err = c.realm.GetFrom(res); err != nil {
  233. return nil, err
  234. }
  235. c.realm = append([]byte(nil), c.realm...)
  236. c.integrity = stun.NewLongTermIntegrity(
  237. c.username.String(), c.realm.String(), c.password,
  238. )
  239. // Trying to authorize.
  240. msg, err = stun.Build(
  241. stun.TransactionID,
  242. stun.NewType(stun.MethodAllocate, stun.ClassRequest),
  243. proto.RequestedTransport{Protocol: proto.ProtoUDP},
  244. &c.username,
  245. &c.realm,
  246. &nonce,
  247. &c.integrity,
  248. stun.Fingerprint,
  249. )
  250. if err != nil {
  251. return nil, err
  252. }
  253. trRes, err = c.PerformTransaction(msg, c.turnServ, false)
  254. if err != nil {
  255. return nil, err
  256. }
  257. res = trRes.Msg
  258. if res.Type.Class == stun.ClassErrorResponse {
  259. var code stun.ErrorCodeAttribute
  260. if err = code.GetFrom(res); err == nil {
  261. return nil, fmt.Errorf("%s (error %s)", res.Type, code) //nolint:goerr113
  262. }
  263. return nil, fmt.Errorf("%s", res.Type) //nolint:goerr113
  264. }
  265. // Getting relayed addresses from response.
  266. var relayed proto.RelayedAddress
  267. if err := relayed.GetFrom(res); err != nil {
  268. return nil, err
  269. }
  270. relayedAddr := &net.UDPAddr{
  271. IP: relayed.IP,
  272. Port: relayed.Port,
  273. }
  274. // Getting lifetime from response
  275. var lifetime proto.Lifetime
  276. if err := lifetime.GetFrom(res); err != nil {
  277. return nil, err
  278. }
  279. relayedConn = client.NewUDPConn(&client.UDPConnConfig{
  280. Observer: c,
  281. RelayedAddr: relayedAddr,
  282. Integrity: c.integrity,
  283. Nonce: nonce,
  284. Lifetime: lifetime.Duration,
  285. Log: c.log,
  286. })
  287. c.setRelayedUDPConn(relayedConn)
  288. return relayedConn, nil
  289. }
  290. // CreatePermission Issues a CreatePermission request for the supplied addresses
  291. // as described in https://datatracker.ietf.org/doc/html/rfc5766#section-9
  292. func (c *Client) CreatePermission(addrs ...net.Addr) error {
  293. return c.relayedUDPConn().CreatePermissions(addrs...)
  294. }
  295. // PerformTransaction performs STUN transaction
  296. func (c *Client) PerformTransaction(msg *stun.Message, to net.Addr, ignoreResult bool) (client.TransactionResult,
  297. error) {
  298. trKey := b64.StdEncoding.EncodeToString(msg.TransactionID[:])
  299. raw := make([]byte, len(msg.Raw))
  300. copy(raw, msg.Raw)
  301. tr := client.NewTransaction(&client.TransactionConfig{
  302. Key: trKey,
  303. Raw: raw,
  304. To: to,
  305. Interval: c.rto,
  306. IgnoreResult: ignoreResult,
  307. })
  308. c.trMap.Insert(trKey, tr)
  309. c.log.Tracef("start %s transaction %s to %s", msg.Type, trKey, tr.To.String())
  310. _, err := c.conn.WriteTo(tr.Raw, to)
  311. if err != nil {
  312. return client.TransactionResult{}, err
  313. }
  314. tr.StartRtxTimer(c.onRtxTimeout)
  315. // If dontWait is true, get the transaction going and return immediately
  316. if ignoreResult {
  317. return client.TransactionResult{}, nil
  318. }
  319. res := tr.WaitForResult()
  320. if res.Err != nil {
  321. return res, res.Err
  322. }
  323. return res, nil
  324. }
  325. // OnDeallocated is called when deallocation of relay address has been complete.
  326. // (Called by UDPConn)
  327. func (c *Client) OnDeallocated(relayedAddr net.Addr) {
  328. c.setRelayedUDPConn(nil)
  329. }
  330. // HandleInbound handles data received.
  331. // This method handles incoming packet demultiplex it by the source address
  332. // and the types of the message.
  333. // This return a booleen (handled or not) and if there was an error.
  334. // Caller should check if the packet was handled by this client or not.
  335. // If not handled, it is assumed that the packet is application data.
  336. // If an error is returned, the caller should discard the packet regardless.
  337. func (c *Client) HandleInbound(data []byte, from net.Addr) (bool, error) {
  338. // +-------------------+-------------------------------+
  339. // | Return Values | |
  340. // +-------------------+ Meaning / Action |
  341. // | handled | error | |
  342. // |=========+=========+===============================+
  343. // | false | nil | Handle the packet as app data |
  344. // |---------+---------+-------------------------------+
  345. // | true | nil | Nothing to do |
  346. // |---------+---------+-------------------------------+
  347. // | false | error | (shouldn't happen) |
  348. // |---------+---------+-------------------------------+
  349. // | true | error | Error occurred while handling |
  350. // +---------+---------+-------------------------------+
  351. // Possible causes of the error:
  352. // - Malformed packet (parse error)
  353. // - STUN message was a request
  354. // - Non-STUN message from the STUN server
  355. switch {
  356. case stun.IsMessage(data):
  357. return true, c.handleSTUNMessage(data, from)
  358. case proto.IsChannelData(data):
  359. return true, c.handleChannelData(data)
  360. case len(c.stunServStr) != 0 && from.String() == c.stunServStr:
  361. // received from STUN server but it is not a STUN message
  362. return true, errNonSTUNMessage
  363. default:
  364. // assume, this is an application data
  365. c.log.Tracef("non-STUN/TURN packect, unhandled")
  366. }
  367. return false, nil
  368. }
  369. func (c *Client) handleSTUNMessage(data []byte, from net.Addr) error {
  370. raw := make([]byte, len(data))
  371. copy(raw, data)
  372. msg := &stun.Message{Raw: raw}
  373. if err := msg.Decode(); err != nil {
  374. return fmt.Errorf("%w: %s", errFailedToDecodeSTUN, err.Error())
  375. }
  376. if msg.Type.Class == stun.ClassRequest {
  377. return fmt.Errorf("%w : %s", errUnexpectedSTUNRequestMessage, msg.String())
  378. }
  379. if msg.Type.Class == stun.ClassIndication {
  380. if msg.Type.Method == stun.MethodData {
  381. var peerAddr proto.PeerAddress
  382. if err := peerAddr.GetFrom(msg); err != nil {
  383. return err
  384. }
  385. from = &net.UDPAddr{
  386. IP: peerAddr.IP,
  387. Port: peerAddr.Port,
  388. }
  389. var data proto.Data
  390. if err := data.GetFrom(msg); err != nil {
  391. return err
  392. }
  393. c.log.Debugf("data indication received from %s", from.String())
  394. relayedConn := c.relayedUDPConn()
  395. if relayedConn == nil {
  396. c.log.Debug("no relayed conn allocated")
  397. return nil // silently discard
  398. }
  399. relayedConn.HandleInbound(data, from)
  400. }
  401. return nil
  402. }
  403. // This is a STUN response message (transactional)
  404. // The type is either:
  405. // - stun.ClassSuccessResponse
  406. // - stun.ClassErrorResponse
  407. trKey := b64.StdEncoding.EncodeToString(msg.TransactionID[:])
  408. c.mutexTrMap.Lock()
  409. tr, ok := c.trMap.Find(trKey)
  410. if !ok {
  411. c.mutexTrMap.Unlock()
  412. // silently discard
  413. c.log.Debugf("no transaction for %s", msg.String())
  414. return nil
  415. }
  416. // End the transaction
  417. tr.StopRtxTimer()
  418. c.trMap.Delete(trKey)
  419. c.mutexTrMap.Unlock()
  420. if !tr.WriteResult(client.TransactionResult{
  421. Msg: msg,
  422. From: from,
  423. Retries: tr.Retries(),
  424. }) {
  425. c.log.Debugf("no listener for %s", msg.String())
  426. }
  427. return nil
  428. }
  429. func (c *Client) handleChannelData(data []byte) error {
  430. chData := &proto.ChannelData{
  431. Raw: make([]byte, len(data)),
  432. }
  433. copy(chData.Raw, data)
  434. if err := chData.Decode(); err != nil {
  435. return err
  436. }
  437. relayedConn := c.relayedUDPConn()
  438. if relayedConn == nil {
  439. c.log.Debug("no relayed conn allocated")
  440. return nil // silently discard
  441. }
  442. addr, ok := relayedConn.FindAddrByChannelNumber(uint16(chData.Number))
  443. if !ok {
  444. return fmt.Errorf("%w: %d", errChannelBindNotFound, int(chData.Number))
  445. }
  446. c.log.Tracef("channel data received from %s (ch=%d)", addr.String(), int(chData.Number))
  447. relayedConn.HandleInbound(chData.Data, addr)
  448. return nil
  449. }
  450. func (c *Client) onRtxTimeout(trKey string, nRtx int) {
  451. c.mutexTrMap.Lock()
  452. defer c.mutexTrMap.Unlock()
  453. tr, ok := c.trMap.Find(trKey)
  454. if !ok {
  455. return // already gone
  456. }
  457. if nRtx == maxRtxCount {
  458. // all retransmisstions failed
  459. c.trMap.Delete(trKey)
  460. if !tr.WriteResult(client.TransactionResult{
  461. Err: fmt.Errorf("%w %s", errAllRetransmissionsFailed, trKey),
  462. }) {
  463. c.log.Debug("no listener for transaction")
  464. }
  465. return
  466. }
  467. c.log.Tracef("retransmitting transaction %s to %s (nRtx=%d)",
  468. trKey, tr.To.String(), nRtx)
  469. _, err := c.conn.WriteTo(tr.Raw, tr.To)
  470. if err != nil {
  471. c.trMap.Delete(trKey)
  472. if !tr.WriteResult(client.TransactionResult{
  473. Err: fmt.Errorf("%w %s", errFailedToRetransmitTransaction, trKey),
  474. }) {
  475. c.log.Debug("no listener for transaction")
  476. }
  477. return
  478. }
  479. tr.StartRtxTimer(c.onRtxTimeout)
  480. }
  481. func (c *Client) setRelayedUDPConn(conn *client.UDPConn) {
  482. c.mutex.Lock()
  483. defer c.mutex.Unlock()
  484. c.relayedConn = conn
  485. }
  486. func (c *Client) relayedUDPConn() *client.UDPConn {
  487. c.mutex.RLock()
  488. defer c.mutex.RUnlock()
  489. return c.relayedConn
  490. }