| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- // Package allocation contains all CRUD operations for allocations
- package allocation
- import (
- "net"
- "sync"
- "sync/atomic"
- "time"
- "github.com/pion/logging"
- "github.com/pion/stun"
- "github.com/pion/turn/v2/internal/ipnet"
- "github.com/pion/turn/v2/internal/proto"
- )
- type allocationResponse struct {
- transactionID [stun.TransactionIDSize]byte
- responseAttrs []stun.Setter
- }
- // Allocation is tied to a FiveTuple and relays traffic
- // use CreateAllocation and GetAllocation to operate
- type Allocation struct {
- RelayAddr net.Addr
- Protocol Protocol
- TurnSocket net.PacketConn
- RelaySocket net.PacketConn
- fiveTuple *FiveTuple
- permissionsLock sync.RWMutex
- permissions map[string]*Permission
- channelBindingsLock sync.RWMutex
- channelBindings []*ChannelBind
- lifetimeTimer *time.Timer
- closed chan interface{}
- log logging.LeveledLogger
- // some clients (firefox or others using resiprocate's nICE lib) may retry allocation
- // with same 5 tuple when received 413, for compatible with these clients,
- // cache for response lost and client retry to implement 'stateless stack approach'
- // https://datatracker.ietf.org/doc/html/rfc5766#section-6.2
- responseCache atomic.Value // *allocationResponse
- }
- func addr2IPFingerprint(addr net.Addr) string {
- switch a := addr.(type) {
- case *net.UDPAddr:
- return a.IP.String()
- case *net.TCPAddr: // Do we really need this case?
- return a.IP.String()
- }
- return "" // shoud never happen
- }
- // NewAllocation creates a new instance of NewAllocation.
- func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger) *Allocation {
- return &Allocation{
- TurnSocket: turnSocket,
- fiveTuple: fiveTuple,
- permissions: make(map[string]*Permission, 64),
- closed: make(chan interface{}),
- log: log,
- }
- }
- // GetPermission gets the Permission from the allocation
- func (a *Allocation) GetPermission(addr net.Addr) *Permission {
- a.permissionsLock.RLock()
- defer a.permissionsLock.RUnlock()
- return a.permissions[addr2IPFingerprint(addr)]
- }
- // AddPermission adds a new permission to the allocation
- func (a *Allocation) AddPermission(p *Permission) {
- fingerprint := addr2IPFingerprint(p.Addr)
- a.permissionsLock.RLock()
- existedPermission, ok := a.permissions[fingerprint]
- a.permissionsLock.RUnlock()
- if ok {
- existedPermission.refresh(permissionTimeout)
- return
- }
- p.allocation = a
- a.permissionsLock.Lock()
- a.permissions[fingerprint] = p
- a.permissionsLock.Unlock()
- p.start(permissionTimeout)
- }
- // RemovePermission removes the net.Addr's fingerprint from the allocation's permissions
- func (a *Allocation) RemovePermission(addr net.Addr) {
- a.permissionsLock.Lock()
- defer a.permissionsLock.Unlock()
- delete(a.permissions, addr2IPFingerprint(addr))
- }
- // AddChannelBind adds a new ChannelBind to the allocation, it also updates the
- // permissions needed for this ChannelBind
- func (a *Allocation) AddChannelBind(c *ChannelBind, lifetime time.Duration) error {
- // Check that this channel id isn't bound to another transport address, and
- // that this transport address isn't bound to another channel number.
- channelByNumber := a.GetChannelByNumber(c.Number)
- if channelByNumber != a.GetChannelByAddr(c.Peer) {
- return errSameChannelDifferentPeer
- }
- // Add or refresh this channel.
- if channelByNumber == nil {
- a.channelBindingsLock.Lock()
- defer a.channelBindingsLock.Unlock()
- c.allocation = a
- a.channelBindings = append(a.channelBindings, c)
- c.start(lifetime)
- // Channel binds also refresh permissions.
- a.AddPermission(NewPermission(c.Peer, a.log))
- } else {
- channelByNumber.refresh(lifetime)
- // Channel binds also refresh permissions.
- a.AddPermission(NewPermission(channelByNumber.Peer, a.log))
- }
- return nil
- }
- // RemoveChannelBind removes the ChannelBind from this allocation by id
- func (a *Allocation) RemoveChannelBind(number proto.ChannelNumber) bool {
- a.channelBindingsLock.Lock()
- defer a.channelBindingsLock.Unlock()
- for i := len(a.channelBindings) - 1; i >= 0; i-- {
- if a.channelBindings[i].Number == number {
- a.channelBindings = append(a.channelBindings[:i], a.channelBindings[i+1:]...)
- return true
- }
- }
- return false
- }
- // GetChannelByNumber gets the ChannelBind from this allocation by id
- func (a *Allocation) GetChannelByNumber(number proto.ChannelNumber) *ChannelBind {
- a.channelBindingsLock.RLock()
- defer a.channelBindingsLock.RUnlock()
- for _, cb := range a.channelBindings {
- if cb.Number == number {
- return cb
- }
- }
- return nil
- }
- // GetChannelByAddr gets the ChannelBind from this allocation by net.Addr
- func (a *Allocation) GetChannelByAddr(addr net.Addr) *ChannelBind {
- a.channelBindingsLock.RLock()
- defer a.channelBindingsLock.RUnlock()
- for _, cb := range a.channelBindings {
- if ipnet.AddrEqual(cb.Peer, addr) {
- return cb
- }
- }
- return nil
- }
- // Refresh updates the allocations lifetime
- func (a *Allocation) Refresh(lifetime time.Duration) {
- if !a.lifetimeTimer.Reset(lifetime) {
- a.log.Errorf("Failed to reset allocation timer for %v", a.fiveTuple)
- }
- }
- // SetResponseCache cache allocation response for retransmit allocation request
- func (a *Allocation) SetResponseCache(transactionID [stun.TransactionIDSize]byte, attrs []stun.Setter) {
- a.responseCache.Store(&allocationResponse{
- transactionID: transactionID,
- responseAttrs: attrs,
- })
- }
- // GetResponseCache return response cache for retransmit allocation request
- func (a *Allocation) GetResponseCache() (id [stun.TransactionIDSize]byte, attrs []stun.Setter) {
- if r := a.responseCache.Load(); r != nil {
- res := r.(*allocationResponse)
- id, attrs = res.transactionID, res.responseAttrs
- }
- return
- }
- // Close closes the allocation
- func (a *Allocation) Close() error {
- select {
- case <-a.closed:
- return nil
- default:
- }
- close(a.closed)
- a.lifetimeTimer.Stop()
- a.permissionsLock.RLock()
- for _, p := range a.permissions {
- p.lifetimeTimer.Stop()
- }
- a.permissionsLock.RUnlock()
- a.channelBindingsLock.RLock()
- for _, c := range a.channelBindings {
- c.lifetimeTimer.Stop()
- }
- a.channelBindingsLock.RUnlock()
- return a.RelaySocket.Close()
- }
- // https://tools.ietf.org/html/rfc5766#section-10.3
- // When the server receives a UDP datagram at a currently allocated
- // relayed transport address, the server looks up the allocation
- // associated with the relayed transport address. The server then
- // checks to see whether the set of permissions for the allocation allow
- // the relaying of the UDP datagram as described in Section 8.
- //
- // If relaying is permitted, then the server checks if there is a
- // channel bound to the peer that sent the UDP datagram (see
- // Section 11). If a channel is bound, then processing proceeds as
- // described in Section 11.7.
- //
- // If relaying is permitted but no channel is bound to the peer, then
- // the server forms and sends a Data indication. The Data indication
- // MUST contain both an XOR-PEER-ADDRESS and a DATA attribute. The DATA
- // attribute is set to the value of the 'data octets' field from the
- // datagram, and the XOR-PEER-ADDRESS attribute is set to the source
- // transport address of the received UDP datagram. The Data indication
- // is then sent on the 5-tuple associated with the allocation.
- const rtpMTU = 1600
- func (a *Allocation) packetHandler(m *Manager) {
- buffer := make([]byte, rtpMTU)
- for {
- n, srcAddr, err := a.RelaySocket.ReadFrom(buffer)
- if err != nil {
- m.DeleteAllocation(a.fiveTuple)
- return
- }
- a.log.Debugf("relay socket %s received %d bytes from %s",
- a.RelaySocket.LocalAddr().String(),
- n,
- srcAddr.String())
- if channel := a.GetChannelByAddr(srcAddr); channel != nil {
- channelData := &proto.ChannelData{
- Data: buffer[:n],
- Number: channel.Number,
- }
- channelData.Encode()
- if _, err = a.TurnSocket.WriteTo(channelData.Raw, a.fiveTuple.SrcAddr); err != nil {
- a.log.Errorf("Failed to send ChannelData from allocation %v %v", srcAddr, err)
- }
- } else if p := a.GetPermission(srcAddr); p != nil {
- udpAddr := srcAddr.(*net.UDPAddr)
- peerAddressAttr := proto.PeerAddress{IP: udpAddr.IP, Port: udpAddr.Port}
- dataAttr := proto.Data(buffer[:n])
- msg, err := stun.Build(stun.TransactionID, stun.NewType(stun.MethodData, stun.ClassIndication), peerAddressAttr, dataAttr)
- if err != nil {
- a.log.Errorf("Failed to send DataIndication from allocation %v %v", srcAddr, err)
- }
- a.log.Debugf("relaying message from %s to client at %s",
- srcAddr.String(),
- a.fiveTuple.SrcAddr.String())
- if _, err = a.TurnSocket.WriteTo(msg.Raw, a.fiveTuple.SrcAddr); err != nil {
- a.log.Errorf("Failed to send DataIndication from allocation %v %v", srcAddr, err)
- }
- } else {
- a.log.Infof("No Permission or Channel exists for %v on allocation %v", srcAddr, a.RelayAddr.String())
- }
- }
- }
|