pex.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package torrent
  2. import (
  3. "net"
  4. "net/netip"
  5. "sync"
  6. "time"
  7. pp "github.com/anacrolix/torrent/peer_protocol"
  8. )
  9. type pexEventType int
  10. const (
  11. pexAdd pexEventType = iota
  12. pexDrop
  13. )
  14. // internal, based on BEP11
  15. const (
  16. pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
  17. pexMaxHold = 25 // length of the drop hold-back buffer
  18. pexMaxDelta = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
  19. )
  20. // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
  21. type pexEvent struct {
  22. t pexEventType
  23. addr netip.AddrPort
  24. f pp.PexPeerFlags
  25. next *pexEvent // event feed list
  26. }
  27. // facilitates efficient de-duplication while generating PEX messages
  28. type pexMsgFactory struct {
  29. msg pp.PexMsg
  30. added map[netip.AddrPort]struct{}
  31. dropped map[netip.AddrPort]struct{}
  32. }
  33. func (me *pexMsgFactory) DeltaLen() int {
  34. return int(max(
  35. int64(len(me.added)),
  36. int64(len(me.dropped))))
  37. }
  38. type addrKey = netip.AddrPort
  39. // Returns the key to use to identify a given addr in the factory.
  40. func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey {
  41. return addr
  42. }
  43. // Returns whether the entry was added (we can check if we're cancelling out another entry and so
  44. // won't hit the limit consuming this event).
  45. func (me *pexMsgFactory) add(e pexEvent) {
  46. key := me.addrKey(e.addr)
  47. if _, ok := me.added[key]; ok {
  48. return
  49. }
  50. if me.added == nil {
  51. me.added = make(map[addrKey]struct{}, pexMaxDelta)
  52. }
  53. addr := krpcNodeAddrFromAddrPort(e.addr)
  54. m := &me.msg
  55. switch {
  56. case addr.IP.To4() != nil:
  57. if _, ok := me.dropped[key]; ok {
  58. if i := m.Dropped.Index(addr); i >= 0 {
  59. m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
  60. }
  61. delete(me.dropped, key)
  62. return
  63. }
  64. m.Added = append(m.Added, addr)
  65. m.AddedFlags = append(m.AddedFlags, e.f)
  66. case len(addr.IP) == net.IPv6len:
  67. if _, ok := me.dropped[key]; ok {
  68. if i := m.Dropped6.Index(addr); i >= 0 {
  69. m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
  70. }
  71. delete(me.dropped, key)
  72. return
  73. }
  74. m.Added6 = append(m.Added6, addr)
  75. m.Added6Flags = append(m.Added6Flags, e.f)
  76. default:
  77. panic(addr)
  78. }
  79. me.added[key] = struct{}{}
  80. }
  81. // Returns whether the entry was added (we can check if we're cancelling out another entry and so
  82. // won't hit the limit consuming this event).
  83. func (me *pexMsgFactory) drop(e pexEvent) {
  84. addr := krpcNodeAddrFromAddrPort(e.addr)
  85. key := me.addrKey(e.addr)
  86. if me.dropped == nil {
  87. me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
  88. }
  89. if _, ok := me.dropped[key]; ok {
  90. return
  91. }
  92. m := &me.msg
  93. switch {
  94. case addr.IP.To4() != nil:
  95. if _, ok := me.added[key]; ok {
  96. if i := m.Added.Index(addr); i >= 0 {
  97. m.Added = append(m.Added[:i], m.Added[i+1:]...)
  98. m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
  99. }
  100. delete(me.added, key)
  101. return
  102. }
  103. m.Dropped = append(m.Dropped, addr)
  104. case len(addr.IP) == net.IPv6len:
  105. if _, ok := me.added[key]; ok {
  106. if i := m.Added6.Index(addr); i >= 0 {
  107. m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
  108. m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
  109. }
  110. delete(me.added, key)
  111. return
  112. }
  113. m.Dropped6 = append(m.Dropped6, addr)
  114. }
  115. me.dropped[key] = struct{}{}
  116. }
  117. func (me *pexMsgFactory) append(event pexEvent) {
  118. switch event.t {
  119. case pexAdd:
  120. me.add(event)
  121. case pexDrop:
  122. me.drop(event)
  123. default:
  124. panic(event.t)
  125. }
  126. }
  127. func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
  128. return &me.msg
  129. }
  130. // Per-torrent PEX state
  131. type pexState struct {
  132. sync.RWMutex
  133. tail *pexEvent // event feed list
  134. hold []pexEvent // delayed drops
  135. // Torrent-wide cooldown deadline on inbound. This exists to prevent PEX from drowning out other
  136. // peer address sources, until that is fixed.
  137. rest time.Time
  138. nc int // net number of alive conns
  139. msg0 pexMsgFactory // initial message
  140. }
  141. // Reset wipes the state clean, releasing resources. Called from Torrent.Close().
  142. func (s *pexState) Reset() {
  143. s.Lock()
  144. defer s.Unlock()
  145. s.tail = nil
  146. s.hold = nil
  147. s.nc = 0
  148. s.rest = time.Time{}
  149. s.msg0 = pexMsgFactory{}
  150. }
  151. func (s *pexState) append(e *pexEvent) {
  152. if s.tail != nil {
  153. s.tail.next = e
  154. }
  155. s.tail = e
  156. s.msg0.append(*e)
  157. }
  158. func (s *pexState) Add(c *PeerConn) {
  159. e, err := c.pexEvent(pexAdd)
  160. if err != nil {
  161. return
  162. }
  163. s.Lock()
  164. defer s.Unlock()
  165. s.nc++
  166. if s.nc >= pexTargAdded {
  167. for _, e := range s.hold {
  168. ne := e
  169. s.append(&ne)
  170. }
  171. s.hold = s.hold[:0]
  172. }
  173. c.pex.Listed = true
  174. s.append(&e)
  175. }
  176. func (s *pexState) Drop(c *PeerConn) {
  177. if !c.pex.Listed {
  178. // skip connections which were not previously Added
  179. return
  180. }
  181. e, err := c.pexEvent(pexDrop)
  182. if err != nil {
  183. return
  184. }
  185. s.Lock()
  186. defer s.Unlock()
  187. s.nc--
  188. if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
  189. s.hold = append(s.hold, e)
  190. } else {
  191. s.append(&e)
  192. }
  193. }
  194. // Generate a PEX message based on the event feed.
  195. // Also returns a pointer to pass to the subsequent calls
  196. // to produce incremental deltas.
  197. func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
  198. s.RLock()
  199. defer s.RUnlock()
  200. if start == nil {
  201. return *s.msg0.PexMsg(), s.tail
  202. }
  203. var msg pexMsgFactory
  204. last := start
  205. for e := start.next; e != nil; e = e.next {
  206. if msg.DeltaLen() >= pexMaxDelta {
  207. break
  208. }
  209. msg.append(*e)
  210. last = e
  211. }
  212. return *msg.PexMsg(), last
  213. }
  214. // The same as Genmsg but just counts up the distinct events that haven't been sent.
  215. func (s *pexState) numPending(start *pexEvent) (num int) {
  216. s.RLock()
  217. defer s.RUnlock()
  218. if start == nil {
  219. return s.msg0.PexMsg().Len()
  220. }
  221. for e := start.next; e != nil; e = e.next {
  222. num++
  223. }
  224. return
  225. }