utp.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Package utp implements uTP, the micro transport protocol as used with
  2. // Bittorrent. It opts for simplicity and reliability over strict adherence to
  3. // the (poor) spec. It allows using the underlying OS-level transport despite
  4. // dispatching uTP on top to allow for example, shared socket use with DHT.
  5. // Additionally, multiple uTP connections can share the same OS socket, to
  6. // truly realize uTP's claim to be light on system and network switching
  7. // resources.
  8. //
  9. // Socket is a wrapper of net.UDPConn, and performs dispatching of uTP packets
  10. // to attached uTP Conns. Dial and Accept is done via Socket. Conn implements
  11. // net.Conn over uTP, via aforementioned Socket.
  12. package utp
  13. import (
  14. "context"
  15. "errors"
  16. "fmt"
  17. "net"
  18. "os"
  19. "strconv"
  20. "sync"
  21. "time"
  22. pprofsync "github.com/anacrolix/sync"
  23. )
  24. const (
  25. // Maximum received SYNs that haven't been accepted. If more SYNs are
  26. // received, a pseudo randomly selected SYN is replied to with a reset to
  27. // make room.
  28. backlog = 50
  29. // IPv6 min MTU is 1280, -40 for IPv6 header, and ~8 for fragment header?
  30. minMTU = 1438 // Why?
  31. // uTP header of 20, +2 for the next extension, and an optional selective
  32. // ACK.
  33. maxHeaderSize = 20 + 2 + (((maxUnackedInbound+7)/8)+3)/4*4
  34. maxPayloadSize = minMTU - maxHeaderSize
  35. maxRecvSize = 0x2000
  36. // Maximum out-of-order packets to buffer.
  37. maxUnackedInbound = 256
  38. maxUnackedSends = 256
  39. readBufferLen = 1 << 20 // ~1MiB
  40. // How long to wait before sending a state packet, after one is required.
  41. // This prevents spamming a state packet for every packet received, and
  42. // non-state packets that are being sent also fill the role.
  43. pendingSendStateDelay = 500 * time.Microsecond
  44. )
  45. var (
  46. sendBufferPool = sync.Pool{
  47. New: func() interface{} { return make([]byte, minMTU) },
  48. }
  49. // This is the latency we assume on new connections. It should be higher
  50. // than the latency we expect on most connections to prevent excessive
  51. // resending to peers that take a long time to respond, before we've got a
  52. // better idea of their actual latency.
  53. initialLatency time.Duration
  54. // If a write isn't acked within this period, destroy the connection.
  55. writeTimeout time.Duration
  56. // Assume the connection has been closed by the peer getting no packets of
  57. // any kind for this time.
  58. packetReadTimeout time.Duration
  59. )
  60. func setDefaultDurations() {
  61. // An approximate upper bound for most connections across the world.
  62. initialLatency = 400 * time.Millisecond
  63. // Getting no reply for this period for a packet, we can probably rule out
  64. // latency and client lag.
  65. writeTimeout = 15 * time.Second
  66. // Somewhere longer than the BitTorrent grace period (90-120s), and less
  67. // than default TCP reset (4min).
  68. packetReadTimeout = 2 * time.Minute
  69. }
  70. func init() {
  71. setDefaultDurations()
  72. }
  73. // Strongly-type guarantee of resolved network address.
  74. type resolvedAddrStr string
  75. type read struct {
  76. data []byte
  77. from net.Addr
  78. }
  79. type syn struct {
  80. seq_nr, conn_id uint16
  81. // net.Addr.String() of a Socket's real net.PacketConn.
  82. addr string
  83. }
  84. var (
  85. mu pprofsync.RWMutex
  86. sockets = map[*Socket]struct{}{}
  87. logLevel = 0
  88. artificialPacketDropChance = 0.0
  89. )
  90. func init() {
  91. logLevel, _ = strconv.Atoi(os.Getenv("GO_UTP_LOGGING"))
  92. fmt.Sscanf(os.Getenv("GO_UTP_PACKET_DROP"), "%f", &artificialPacketDropChance)
  93. }
  94. var (
  95. errClosed = errors.New("closed")
  96. errTimeout net.Error = timeoutError{"i/o timeout"}
  97. errAckTimeout = timeoutError{"timed out waiting for ack"}
  98. )
  99. type timeoutError struct {
  100. msg string
  101. }
  102. func (me timeoutError) Timeout() bool { return true }
  103. func (me timeoutError) Error() string { return me.msg }
  104. func (me timeoutError) Temporary() bool { return false }
  105. type st int
  106. func (me st) String() string {
  107. switch me {
  108. case stData:
  109. return "stData"
  110. case stFin:
  111. return "stFin"
  112. case stState:
  113. return "stState"
  114. case stReset:
  115. return "stReset"
  116. case stSyn:
  117. return "stSyn"
  118. default:
  119. panic(fmt.Sprintf("%d", me))
  120. }
  121. }
  122. const (
  123. stData st = 0
  124. stFin = 1
  125. stState = 2
  126. stReset = 3
  127. stSyn = 4
  128. // Used for validating packet headers.
  129. stMax = stSyn
  130. )
  131. type recv struct {
  132. seen bool
  133. data []byte
  134. Type st
  135. }
  136. // Attempt to connect to a remote uTP listener, creating a Socket just for
  137. // this connection.
  138. func Dial(addr string) (net.Conn, error) {
  139. return DialContext(context.Background(), addr)
  140. }
  141. // Same as Dial with a timeout parameter. Creates a Socket just for the
  142. // connection, which will be closed with the Conn is. To reuse another Socket,
  143. // see Socket.Dial.
  144. func DialContext(ctx context.Context, addr string) (nc net.Conn, err error) {
  145. s, err := NewSocket("udp", ":0")
  146. if err != nil {
  147. return
  148. }
  149. defer s.Close()
  150. return s.DialContext(ctx, "", addr)
  151. }
  152. // Listen creates listener Socket to accept incoming connections.
  153. func Listen(laddr string) (net.Listener, error) {
  154. return NewSocket("udp", laddr)
  155. }
  156. func nowTimestamp() uint32 {
  157. return uint32(time.Now().UnixNano() / int64(time.Microsecond))
  158. }
  159. func seqLess(a, b uint16) bool {
  160. if b < 0x8000 {
  161. return a < b || a >= b-0x8000
  162. } else {
  163. return a < b && a >= b-0x8000
  164. }
  165. }