| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- // Package utp implements uTP, the micro transport protocol as used with
- // Bittorrent. It opts for simplicity and reliability over strict adherence to
- // the (poor) spec. It allows using the underlying OS-level transport despite
- // dispatching uTP on top to allow for example, shared socket use with DHT.
- // Additionally, multiple uTP connections can share the same OS socket, to
- // truly realize uTP's claim to be light on system and network switching
- // resources.
- //
- // Socket is a wrapper of net.UDPConn, and performs dispatching of uTP packets
- // to attached uTP Conns. Dial and Accept is done via Socket. Conn implements
- // net.Conn over uTP, via aforementioned Socket.
- package utp
- import (
- "context"
- "errors"
- "fmt"
- "net"
- "os"
- "strconv"
- "sync"
- "time"
- pprofsync "github.com/anacrolix/sync"
- )
- const (
- // Maximum received SYNs that haven't been accepted. If more SYNs are
- // received, a pseudo randomly selected SYN is replied to with a reset to
- // make room.
- backlog = 50
- // IPv6 min MTU is 1280, -40 for IPv6 header, and ~8 for fragment header?
- minMTU = 1438 // Why?
- // uTP header of 20, +2 for the next extension, and an optional selective
- // ACK.
- maxHeaderSize = 20 + 2 + (((maxUnackedInbound+7)/8)+3)/4*4
- maxPayloadSize = minMTU - maxHeaderSize
- maxRecvSize = 0x2000
- // Maximum out-of-order packets to buffer.
- maxUnackedInbound = 256
- maxUnackedSends = 256
- readBufferLen = 1 << 20 // ~1MiB
- // How long to wait before sending a state packet, after one is required.
- // This prevents spamming a state packet for every packet received, and
- // non-state packets that are being sent also fill the role.
- pendingSendStateDelay = 500 * time.Microsecond
- )
- var (
- sendBufferPool = sync.Pool{
- New: func() interface{} { return make([]byte, minMTU) },
- }
- // This is the latency we assume on new connections. It should be higher
- // than the latency we expect on most connections to prevent excessive
- // resending to peers that take a long time to respond, before we've got a
- // better idea of their actual latency.
- initialLatency time.Duration
- // If a write isn't acked within this period, destroy the connection.
- writeTimeout time.Duration
- // Assume the connection has been closed by the peer getting no packets of
- // any kind for this time.
- packetReadTimeout time.Duration
- )
- func setDefaultDurations() {
- // An approximate upper bound for most connections across the world.
- initialLatency = 400 * time.Millisecond
- // Getting no reply for this period for a packet, we can probably rule out
- // latency and client lag.
- writeTimeout = 15 * time.Second
- // Somewhere longer than the BitTorrent grace period (90-120s), and less
- // than default TCP reset (4min).
- packetReadTimeout = 2 * time.Minute
- }
- func init() {
- setDefaultDurations()
- }
- // Strongly-type guarantee of resolved network address.
- type resolvedAddrStr string
- type read struct {
- data []byte
- from net.Addr
- }
- type syn struct {
- seq_nr, conn_id uint16
- // net.Addr.String() of a Socket's real net.PacketConn.
- addr string
- }
- var (
- mu pprofsync.RWMutex
- sockets = map[*Socket]struct{}{}
- logLevel = 0
- artificialPacketDropChance = 0.0
- )
- func init() {
- logLevel, _ = strconv.Atoi(os.Getenv("GO_UTP_LOGGING"))
- fmt.Sscanf(os.Getenv("GO_UTP_PACKET_DROP"), "%f", &artificialPacketDropChance)
- }
- var (
- errClosed = errors.New("closed")
- errTimeout net.Error = timeoutError{"i/o timeout"}
- errAckTimeout = timeoutError{"timed out waiting for ack"}
- )
- type timeoutError struct {
- msg string
- }
- func (me timeoutError) Timeout() bool { return true }
- func (me timeoutError) Error() string { return me.msg }
- func (me timeoutError) Temporary() bool { return false }
- type st int
- func (me st) String() string {
- switch me {
- case stData:
- return "stData"
- case stFin:
- return "stFin"
- case stState:
- return "stState"
- case stReset:
- return "stReset"
- case stSyn:
- return "stSyn"
- default:
- panic(fmt.Sprintf("%d", me))
- }
- }
- const (
- stData st = 0
- stFin = 1
- stState = 2
- stReset = 3
- stSyn = 4
- // Used for validating packet headers.
- stMax = stSyn
- )
- type recv struct {
- seen bool
- data []byte
- Type st
- }
- // Attempt to connect to a remote uTP listener, creating a Socket just for
- // this connection.
- func Dial(addr string) (net.Conn, error) {
- return DialContext(context.Background(), addr)
- }
- // Same as Dial with a timeout parameter. Creates a Socket just for the
- // connection, which will be closed with the Conn is. To reuse another Socket,
- // see Socket.Dial.
- func DialContext(ctx context.Context, addr string) (nc net.Conn, err error) {
- s, err := NewSocket("udp", ":0")
- if err != nil {
- return
- }
- defer s.Close()
- return s.DialContext(ctx, "", addr)
- }
- // Listen creates listener Socket to accept incoming connections.
- func Listen(laddr string) (net.Listener, error) {
- return NewSocket("udp", laddr)
- }
- func nowTimestamp() uint32 {
- return uint32(time.Now().UnixNano() / int64(time.Microsecond))
- }
- func seqLess(a, b uint16) bool {
- if b < 0x8000 {
- return a < b || a >= b-0x8000
- } else {
- return a < b && a >= b-0x8000
- }
- }
|