pexconn.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package torrent
  2. import (
  3. "fmt"
  4. "net/netip"
  5. "time"
  6. g "github.com/anacrolix/generics"
  7. "github.com/anacrolix/log"
  8. pp "github.com/anacrolix/torrent/peer_protocol"
  9. )
  10. const (
  11. pexRetryDelay = 10 * time.Second
  12. pexInterval = 1 * time.Minute
  13. )
  14. // per-connection PEX state
  15. type pexConnState struct {
  16. enabled bool
  17. xid pp.ExtensionNumber
  18. last *pexEvent
  19. timer *time.Timer
  20. gate chan struct{}
  21. readyfn func()
  22. torrent *Torrent
  23. Listed bool
  24. logger log.Logger
  25. // Running record of live connections the remote end of the connection purports to have.
  26. remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
  27. lastRecv time.Time
  28. }
  29. func (s *pexConnState) IsEnabled() bool {
  30. return s.enabled
  31. }
  32. // Init is called from the reader goroutine upon the extended handshake completion
  33. func (s *pexConnState) Init(c *PeerConn) {
  34. xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
  35. if !ok || xid == 0 || c.t.cl.config.DisablePEX {
  36. return
  37. }
  38. s.xid = xid
  39. s.last = nil
  40. s.torrent = c.t
  41. s.logger = c.logger.WithDefaultLevel(log.Debug).WithNames("pex")
  42. s.readyfn = c.tickleWriter
  43. s.gate = make(chan struct{}, 1)
  44. s.timer = time.AfterFunc(0, func() {
  45. s.gate <- struct{}{}
  46. s.readyfn() // wake up the writer
  47. })
  48. s.enabled = true
  49. }
  50. // schedule next PEX message
  51. func (s *pexConnState) sched(delay time.Duration) {
  52. s.timer.Reset(delay)
  53. }
  54. // generate next PEX message for the peer; returns nil if nothing yet to send
  55. func (s *pexConnState) genmsg() *pp.PexMsg {
  56. tx, last := s.torrent.pex.Genmsg(s.last)
  57. if tx.Len() == 0 {
  58. return nil
  59. }
  60. s.last = last
  61. return &tx
  62. }
  63. func (s *pexConnState) numPending() int {
  64. if s.torrent == nil {
  65. return 0
  66. }
  67. return s.torrent.pex.numPending(s.last)
  68. }
  69. // Share is called from the writer goroutine if when it is woken up with the write buffers empty
  70. // Returns whether there's more room on the send buffer to write to.
  71. func (s *pexConnState) Share(postfn messageWriter) bool {
  72. select {
  73. case <-s.gate:
  74. if tx := s.genmsg(); tx != nil {
  75. s.logger.Print("sending PEX message: ", tx)
  76. flow := postfn(tx.Message(s.xid))
  77. s.sched(pexInterval)
  78. return flow
  79. } else {
  80. // no PEX to send this time - try again shortly
  81. s.sched(pexRetryDelay)
  82. }
  83. default:
  84. }
  85. return true
  86. }
  87. func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
  88. for _, dropped := range rx.Dropped {
  89. addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
  90. delete(s.remoteLiveConns, addrPort)
  91. }
  92. for _, dropped := range rx.Dropped6 {
  93. addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
  94. delete(s.remoteLiveConns, addrPort)
  95. }
  96. for i, added := range rx.Added {
  97. addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
  98. addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
  99. flags := g.SliceGet(rx.AddedFlags, i)
  100. g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
  101. }
  102. for i, added := range rx.Added6 {
  103. addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
  104. addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
  105. flags := g.SliceGet(rx.Added6Flags, i)
  106. g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
  107. }
  108. return
  109. }
  110. // Recv is called from the reader goroutine
  111. func (s *pexConnState) Recv(payload []byte) error {
  112. rx, err := pp.LoadPexMsg(payload)
  113. if err != nil {
  114. return fmt.Errorf("unmarshalling pex message: %w", err)
  115. }
  116. s.logger.Printf("received pex message: %v", rx)
  117. torrent.Add("pex added peers received", int64(len(rx.Added)))
  118. torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
  119. // "Clients must batch updates to send no more than 1 PEX message per minute."
  120. timeSinceLastRecv := time.Since(s.lastRecv)
  121. if timeSinceLastRecv < 45*time.Second {
  122. return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
  123. }
  124. s.lastRecv = time.Now()
  125. s.updateRemoteLiveConns(rx)
  126. var peers peerInfos
  127. peers.AppendFromPex(rx.Added6, rx.Added6Flags)
  128. peers.AppendFromPex(rx.Added, rx.AddedFlags)
  129. if time.Now().Before(s.torrent.pex.rest) {
  130. s.logger.Printf("in cooldown period, incoming PEX discarded")
  131. return nil
  132. }
  133. added := s.torrent.addPeers(peers)
  134. s.logger.Printf("got %v peers over pex, added %v", len(peers), added)
  135. if len(peers) > 0 {
  136. s.torrent.pex.rest = time.Now().Add(pexInterval)
  137. }
  138. // one day we may also want to:
  139. // - handle drops somehow
  140. // - detect malicious peers
  141. return nil
  142. }
  143. func (s *pexConnState) Close() {
  144. if s.timer != nil {
  145. s.timer.Stop()
  146. }
  147. }