| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615 |
- // Package client implements the API for a TURN client
- package client
- import (
- "errors"
- "fmt"
- "io"
- "math"
- "net"
- "sync"
- "time"
- "github.com/pion/logging"
- "github.com/pion/stun"
- "github.com/pion/turn/v2/internal/proto"
- )
- const (
- maxReadQueueSize = 1024
- permRefreshInterval = 120 * time.Second
- maxRetryAttempts = 3
- )
- const (
- timerIDRefreshAlloc int = iota
- timerIDRefreshPerms
- )
- func noDeadline() time.Time {
- return time.Time{}
- }
- type inboundData struct {
- data []byte
- from net.Addr
- }
- // UDPConnObserver is an interface to UDPConn observer
- type UDPConnObserver interface {
- TURNServerAddr() net.Addr
- Username() stun.Username
- Realm() stun.Realm
- WriteTo(data []byte, to net.Addr) (int, error)
- PerformTransaction(msg *stun.Message, to net.Addr, dontWait bool) (TransactionResult, error)
- OnDeallocated(relayedAddr net.Addr)
- }
- // UDPConnConfig is a set of configuration params use by NewUDPConn
- type UDPConnConfig struct {
- Observer UDPConnObserver
- RelayedAddr net.Addr
- Integrity stun.MessageIntegrity
- Nonce stun.Nonce
- Lifetime time.Duration
- Log logging.LeveledLogger
- }
- // UDPConn is the implementation of the Conn and PacketConn interfaces for UDP network connections.
- // comatible with net.PacketConn and net.Conn
- type UDPConn struct {
- obs UDPConnObserver // read-only
- relayedAddr net.Addr // read-only
- permMap *permissionMap // thread-safe
- bindingMgr *bindingManager // thread-safe
- integrity stun.MessageIntegrity // read-only
- _nonce stun.Nonce // needs mutex x
- _lifetime time.Duration // needs mutex x
- readCh chan *inboundData // thread-safe
- closeCh chan struct{} // thread-safe
- readTimer *time.Timer // thread-safe
- refreshAllocTimer *PeriodicTimer // thread-safe
- refreshPermsTimer *PeriodicTimer // thread-safe
- mutex sync.RWMutex // thread-safe
- log logging.LeveledLogger // read-only
- }
- // NewUDPConn creates a new instance of UDPConn
- func NewUDPConn(config *UDPConnConfig) *UDPConn {
- c := &UDPConn{
- obs: config.Observer,
- relayedAddr: config.RelayedAddr,
- permMap: newPermissionMap(),
- bindingMgr: newBindingManager(),
- integrity: config.Integrity,
- _nonce: config.Nonce,
- _lifetime: config.Lifetime,
- readCh: make(chan *inboundData, maxReadQueueSize),
- closeCh: make(chan struct{}),
- readTimer: time.NewTimer(time.Duration(math.MaxInt64)),
- log: config.Log,
- }
- c.log.Debugf("initial lifetime: %d seconds", int(c.lifetime().Seconds()))
- c.refreshAllocTimer = NewPeriodicTimer(
- timerIDRefreshAlloc,
- c.onRefreshTimers,
- c.lifetime()/2,
- )
- c.refreshPermsTimer = NewPeriodicTimer(
- timerIDRefreshPerms,
- c.onRefreshTimers,
- permRefreshInterval,
- )
- if c.refreshAllocTimer.Start() {
- c.log.Debugf("refreshAllocTimer started")
- }
- if c.refreshPermsTimer.Start() {
- c.log.Debugf("refreshPermsTimer started")
- }
- return c
- }
- // ReadFrom reads a packet from the connection,
- // copying the payload into p. It returns the number of
- // bytes copied into p and the return address that
- // was on the packet.
- // It returns the number of bytes read (0 <= n <= len(p))
- // and any error encountered. Callers should always process
- // the n > 0 bytes returned before considering the error err.
- // ReadFrom can be made to time out and return
- // an Error with Timeout() == true after a fixed time limit;
- // see SetDeadline and SetReadDeadline.
- func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
- for {
- select {
- case ibData := <-c.readCh:
- n := copy(p, ibData.data)
- if n < len(ibData.data) {
- return 0, nil, io.ErrShortBuffer
- }
- return n, ibData.from, nil
- case <-c.readTimer.C:
- return 0, nil, &net.OpError{
- Op: "read",
- Net: c.LocalAddr().Network(),
- Addr: c.LocalAddr(),
- Err: newTimeoutError("i/o timeout"),
- }
- case <-c.closeCh:
- return 0, nil, &net.OpError{
- Op: "read",
- Net: c.LocalAddr().Network(),
- Addr: c.LocalAddr(),
- Err: errClosed,
- }
- }
- }
- }
- // WriteTo writes a packet with payload p to addr.
- // WriteTo can be made to time out and return
- // an Error with Timeout() == true after a fixed time limit;
- // see SetDeadline and SetWriteDeadline.
- // On packet-oriented connections, write timeouts are rare.
- func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (int, error) { //nolint: gocognit
- var err error
- _, ok := addr.(*net.UDPAddr)
- if !ok {
- return 0, errUDPAddrCast
- }
- // check if we have a permission for the destination IP addr
- perm, ok := c.permMap.find(addr)
- if !ok {
- perm = &permission{}
- c.permMap.insert(addr, perm)
- }
- // This func-block would block, per destination IP (, or perm), until
- // the perm state becomes "requested". Purpose of this is to guarantee
- // the order of packets (within the same perm).
- // Note that CreatePermission transaction may not be complete before
- // all the data transmission. This is done assuming that the request
- // will be mostly likely successful and we can tolerate some loss of
- // UDP packet (or reorder), inorder to minimize the latency in most cases.
- createPermission := func() error {
- perm.mutex.Lock()
- defer perm.mutex.Unlock()
- if perm.state() == permStateIdle {
- // punch a hole! (this would block a bit..)
- if err = c.CreatePermissions(addr); err != nil {
- c.permMap.delete(addr)
- return err
- }
- perm.setState(permStatePermitted)
- }
- return nil
- }
- for i := 0; i < maxRetryAttempts; i++ {
- if err = createPermission(); !errors.Is(err, errTryAgain) {
- break
- }
- }
- if err != nil {
- return 0, err
- }
- // bind channel
- b, ok := c.bindingMgr.findByAddr(addr)
- if !ok {
- b = c.bindingMgr.create(addr)
- }
- bindSt := b.state()
- if bindSt == bindingStateIdle || bindSt == bindingStateRequest || bindSt == bindingStateFailed {
- func() {
- // block only callers with the same binding until
- // the binding transaction has been complete
- b.muBind.Lock()
- defer b.muBind.Unlock()
- // binding state may have been changed while waiting. check again.
- if b.state() == bindingStateIdle {
- b.setState(bindingStateRequest)
- go func() {
- err2 := c.bind(b)
- if err2 != nil {
- c.log.Warnf("bind() failed: %s", err2.Error())
- b.setState(bindingStateFailed)
- // keep going...
- } else {
- b.setState(bindingStateReady)
- }
- }()
- }
- }()
- // send data using SendIndication
- peerAddr := addr2PeerAddress(addr)
- var msg *stun.Message
- msg, err = stun.Build(
- stun.TransactionID,
- stun.NewType(stun.MethodSend, stun.ClassIndication),
- proto.Data(p),
- peerAddr,
- stun.Fingerprint,
- )
- if err != nil {
- return 0, err
- }
- // indication has no transaction (fire-and-forget)
- return c.obs.WriteTo(msg.Raw, c.obs.TURNServerAddr())
- }
- // binding is either ready
- // check if the binding needs a refresh
- func() {
- b.muBind.Lock()
- defer b.muBind.Unlock()
- if b.state() == bindingStateReady && time.Since(b.refreshedAt()) > 5*time.Minute {
- b.setState(bindingStateRefresh)
- go func() {
- err = c.bind(b)
- if err != nil {
- c.log.Warnf("bind() for refresh failed: %s", err.Error())
- b.setState(bindingStateFailed)
- // keep going...
- } else {
- b.setRefreshedAt(time.Now())
- b.setState(bindingStateReady)
- }
- }()
- }
- }()
- // send via ChannelData
- return c.sendChannelData(p, b.number)
- }
- // Close closes the connection.
- // Any blocked ReadFrom or WriteTo operations will be unblocked and return errors.
- func (c *UDPConn) Close() error {
- c.refreshAllocTimer.Stop()
- c.refreshPermsTimer.Stop()
- select {
- case <-c.closeCh:
- return errAlreadyClosed
- default:
- close(c.closeCh)
- }
- c.obs.OnDeallocated(c.relayedAddr)
- return c.refreshAllocation(0, true /* dontWait=true */)
- }
- // LocalAddr returns the local network address.
- func (c *UDPConn) LocalAddr() net.Addr {
- return c.relayedAddr
- }
- // SetDeadline sets the read and write deadlines associated
- // with the connection. It is equivalent to calling both
- // SetReadDeadline and SetWriteDeadline.
- //
- // A deadline is an absolute time after which I/O operations
- // fail with a timeout (see type Error) instead of
- // blocking. The deadline applies to all future and pending
- // I/O, not just the immediately following call to ReadFrom or
- // WriteTo. After a deadline has been exceeded, the connection
- // can be refreshed by setting a deadline in the future.
- //
- // An idle timeout can be implemented by repeatedly extending
- // the deadline after successful ReadFrom or WriteTo calls.
- //
- // A zero value for t means I/O operations will not time out.
- func (c *UDPConn) SetDeadline(t time.Time) error {
- return c.SetReadDeadline(t)
- }
- // SetReadDeadline sets the deadline for future ReadFrom calls
- // and any currently-blocked ReadFrom call.
- // A zero value for t means ReadFrom will not time out.
- func (c *UDPConn) SetReadDeadline(t time.Time) error {
- var d time.Duration
- if t == noDeadline() {
- d = time.Duration(math.MaxInt64)
- } else {
- d = time.Until(t)
- }
- c.readTimer.Reset(d)
- return nil
- }
- // SetWriteDeadline sets the deadline for future WriteTo calls
- // and any currently-blocked WriteTo call.
- // Even if write times out, it may return n > 0, indicating that
- // some of the data was successfully written.
- // A zero value for t means WriteTo will not time out.
- func (c *UDPConn) SetWriteDeadline(t time.Time) error {
- // Write never blocks.
- return nil
- }
- func addr2PeerAddress(addr net.Addr) proto.PeerAddress {
- var peerAddr proto.PeerAddress
- switch a := addr.(type) {
- case *net.UDPAddr:
- peerAddr.IP = a.IP
- peerAddr.Port = a.Port
- case *net.TCPAddr:
- peerAddr.IP = a.IP
- peerAddr.Port = a.Port
- }
- return peerAddr
- }
- // CreatePermissions Issues a CreatePermission request for the supplied addresses
- // as described in https://datatracker.ietf.org/doc/html/rfc5766#section-9
- func (c *UDPConn) CreatePermissions(addrs ...net.Addr) error {
- setters := []stun.Setter{
- stun.TransactionID,
- stun.NewType(stun.MethodCreatePermission, stun.ClassRequest),
- }
- for _, addr := range addrs {
- setters = append(setters, addr2PeerAddress(addr))
- }
- setters = append(setters,
- c.obs.Username(),
- c.obs.Realm(),
- c.nonce(),
- c.integrity,
- stun.Fingerprint)
- msg, err := stun.Build(setters...)
- if err != nil {
- return err
- }
- trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), false)
- if err != nil {
- return err
- }
- res := trRes.Msg
- if res.Type.Class == stun.ClassErrorResponse {
- var code stun.ErrorCodeAttribute
- if err = code.GetFrom(res); err == nil {
- if code.Code == stun.CodeStaleNonce {
- c.setNonceFromMsg(res)
- return errTryAgain
- }
- return fmt.Errorf("%s (error %s)", res.Type, code) //nolint:goerr113
- }
- return fmt.Errorf("%s", res.Type) //nolint:goerr113
- }
- return nil
- }
- // HandleInbound passes inbound data in UDPConn
- func (c *UDPConn) HandleInbound(data []byte, from net.Addr) {
- // copy data
- copied := make([]byte, len(data))
- copy(copied, data)
- select {
- case c.readCh <- &inboundData{data: copied, from: from}:
- default:
- c.log.Warnf("receive buffer full")
- }
- }
- // FindAddrByChannelNumber returns a peer address associated with the
- // channel number on this UDPConn
- func (c *UDPConn) FindAddrByChannelNumber(chNum uint16) (net.Addr, bool) {
- b, ok := c.bindingMgr.findByNumber(chNum)
- if !ok {
- return nil, false
- }
- return b.addr, true
- }
- func (c *UDPConn) setNonceFromMsg(msg *stun.Message) {
- // Update nonce
- var nonce stun.Nonce
- if err := nonce.GetFrom(msg); err == nil {
- c.setNonce(nonce)
- c.log.Debug("refresh allocation: 438, got new nonce.")
- } else {
- c.log.Warn("refresh allocation: 438 but no nonce.")
- }
- }
- func (c *UDPConn) refreshAllocation(lifetime time.Duration, dontWait bool) error {
- msg, err := stun.Build(
- stun.TransactionID,
- stun.NewType(stun.MethodRefresh, stun.ClassRequest),
- proto.Lifetime{Duration: lifetime},
- c.obs.Username(),
- c.obs.Realm(),
- c.nonce(),
- c.integrity,
- stun.Fingerprint,
- )
- if err != nil {
- return fmt.Errorf("%w: %s", errFailedToBuildRefreshRequest, err.Error())
- }
- c.log.Debugf("send refresh request (dontWait=%v)", dontWait)
- trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), dontWait)
- if err != nil {
- return fmt.Errorf("%w: %s", errFailedToRefreshAllocation, err.Error())
- }
- if dontWait {
- c.log.Debug("refresh request sent")
- return nil
- }
- c.log.Debug("refresh request sent, and waiting response")
- res := trRes.Msg
- if res.Type.Class == stun.ClassErrorResponse {
- var code stun.ErrorCodeAttribute
- if err = code.GetFrom(res); err == nil {
- if code.Code == stun.CodeStaleNonce {
- c.setNonceFromMsg(res)
- return errTryAgain
- }
- return err
- }
- return fmt.Errorf("%s", res.Type) //nolint:goerr113
- }
- // Getting lifetime from response
- var updatedLifetime proto.Lifetime
- if err := updatedLifetime.GetFrom(res); err != nil {
- return fmt.Errorf("%w: %s", errFailedToGetLifetime, err.Error())
- }
- c.setLifetime(updatedLifetime.Duration)
- c.log.Debugf("updated lifetime: %d seconds", int(c.lifetime().Seconds()))
- return nil
- }
- func (c *UDPConn) refreshPermissions() error {
- addrs := c.permMap.addrs()
- if len(addrs) == 0 {
- c.log.Debug("no permission to refresh")
- return nil
- }
- if err := c.CreatePermissions(addrs...); err != nil {
- if errors.Is(err, errTryAgain) {
- return errTryAgain
- }
- c.log.Errorf("fail to refresh permissions: %s", err.Error())
- return err
- }
- c.log.Debug("refresh permissions successful")
- return nil
- }
- func (c *UDPConn) bind(b *binding) error {
- setters := []stun.Setter{
- stun.TransactionID,
- stun.NewType(stun.MethodChannelBind, stun.ClassRequest),
- addr2PeerAddress(b.addr),
- proto.ChannelNumber(b.number),
- c.obs.Username(),
- c.obs.Realm(),
- c.nonce(),
- c.integrity,
- stun.Fingerprint,
- }
- msg, err := stun.Build(setters...)
- if err != nil {
- return err
- }
- trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), false)
- if err != nil {
- c.bindingMgr.deleteByAddr(b.addr)
- return err
- }
- res := trRes.Msg
- if res.Type != stun.NewType(stun.MethodChannelBind, stun.ClassSuccessResponse) {
- return fmt.Errorf("unexpected response type %s", res.Type) //nolint:goerr113
- }
- c.log.Debugf("channel binding successful: %s %d", b.addr.String(), b.number)
- // Success.
- return nil
- }
- func (c *UDPConn) sendChannelData(data []byte, chNum uint16) (int, error) {
- chData := &proto.ChannelData{
- Data: data,
- Number: proto.ChannelNumber(chNum),
- }
- chData.Encode()
- return c.obs.WriteTo(chData.Raw, c.obs.TURNServerAddr())
- }
- func (c *UDPConn) onRefreshTimers(id int) {
- c.log.Debugf("refresh timer %d expired", id)
- switch id {
- case timerIDRefreshAlloc:
- var err error
- lifetime := c.lifetime()
- // limit the max retries on errTryAgain to 3
- // when stale nonce returns, sencond retry should succeed
- for i := 0; i < maxRetryAttempts; i++ {
- err = c.refreshAllocation(lifetime, false)
- if !errors.Is(err, errTryAgain) {
- break
- }
- }
- if err != nil {
- c.log.Warnf("refresh allocation failed")
- }
- case timerIDRefreshPerms:
- var err error
- for i := 0; i < maxRetryAttempts; i++ {
- err = c.refreshPermissions()
- if !errors.Is(err, errTryAgain) {
- break
- }
- }
- if err != nil {
- c.log.Warnf("refresh permissions failed")
- }
- }
- }
- func (c *UDPConn) nonce() stun.Nonce {
- c.mutex.RLock()
- defer c.mutex.RUnlock()
- return c._nonce
- }
- func (c *UDPConn) setNonce(nonce stun.Nonce) {
- c.mutex.Lock()
- defer c.mutex.Unlock()
- c.log.Debugf("set new nonce with %d bytes", len(nonce))
- c._nonce = nonce
- }
- func (c *UDPConn) lifetime() time.Duration {
- c.mutex.RLock()
- defer c.mutex.RUnlock()
- return c._lifetime
- }
- func (c *UDPConn) setLifetime(lifetime time.Duration) {
- c.mutex.Lock()
- defer c.mutex.Unlock()
- c._lifetime = lifetime
- }
|