announce.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package dht
  2. // get_peers and announce_peers.
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/anacrolix/chansync"
  9. "github.com/anacrolix/chansync/events"
  10. "github.com/anacrolix/log"
  11. "github.com/anacrolix/dht/v2/int160"
  12. dhtutil "github.com/anacrolix/dht/v2/k-nearest-nodes"
  13. "github.com/anacrolix/dht/v2/krpc"
  14. "github.com/anacrolix/dht/v2/traversal"
  15. )
  16. // Maintains state for an ongoing Announce operation. An Announce is started by calling
  17. // Server.Announce.
  18. type Announce struct {
  19. Peers chan PeersValues
  20. server *Server
  21. infoHash int160.T // Target
  22. announcePeerOpts *AnnouncePeerOpts
  23. scrape bool
  24. peerAnnounced chansync.SetOnce
  25. traversal *traversal.Operation
  26. closed chansync.SetOnce
  27. }
  28. func (a *Announce) String() string {
  29. return fmt.Sprintf("%[1]T %[1]p of %v on %v", a, a.infoHash, a.server)
  30. }
  31. // Returns the number of distinct remote addresses the announce has queried.
  32. func (a *Announce) NumContacted() uint32 {
  33. return atomic.LoadUint32(&a.traversal.Stats().NumAddrsTried)
  34. }
  35. func (a *Announce) TraversalStats() TraversalStats {
  36. return *a.traversal.Stats()
  37. }
  38. // Server.Announce option
  39. type AnnounceOpt func(a *Announce)
  40. // Scrape BEP 33 bloom filters in queries.
  41. func Scrape() AnnounceOpt {
  42. return func(a *Announce) {
  43. a.scrape = true
  44. }
  45. }
  46. // Arguments for announce_peer from a Server.Announce.
  47. type AnnouncePeerOpts struct {
  48. // The peer port that we're announcing.
  49. Port int
  50. // The peer port should be determined by the receiver to be the source port of the query packet.
  51. ImpliedPort bool
  52. }
  53. // Finish an Announce get_peers traversal with an announce of a local peer.
  54. func AnnouncePeer(opts AnnouncePeerOpts) AnnounceOpt {
  55. return func(a *Announce) {
  56. a.announcePeerOpts = &opts
  57. }
  58. }
  59. // Deprecated: Use Server.AnnounceTraversal.
  60. // Traverses the DHT graph toward nodes that store peers for the infohash, streaming them to the
  61. // caller, and announcing the local node to each responding node if port is non-zero or impliedPort
  62. // is true.
  63. func (s *Server) Announce(infoHash [20]byte, port int, impliedPort bool, opts ...AnnounceOpt) (_ *Announce, err error) {
  64. if port != 0 || impliedPort {
  65. opts = append([]AnnounceOpt{AnnouncePeer(AnnouncePeerOpts{
  66. Port: port,
  67. ImpliedPort: impliedPort,
  68. })}, opts...)
  69. }
  70. return s.AnnounceTraversal(infoHash, opts...)
  71. }
  72. // Traverses the DHT graph toward nodes that store peers for the infohash, streaming them to the
  73. // caller.
  74. func (s *Server) AnnounceTraversal(infoHash [20]byte, opts ...AnnounceOpt) (_ *Announce, err error) {
  75. infoHashInt160 := int160.FromByteArray(infoHash)
  76. a := &Announce{
  77. Peers: make(chan PeersValues),
  78. server: s,
  79. infoHash: infoHashInt160,
  80. }
  81. for _, opt := range opts {
  82. opt(a)
  83. }
  84. a.traversal = traversal.Start(traversal.OperationInput{
  85. Target: infoHash,
  86. DoQuery: a.getPeers,
  87. NodeFilter: s.TraversalNodeFilter,
  88. })
  89. nodes, err := s.TraversalStartingNodes()
  90. if err != nil {
  91. a.traversal.Stop()
  92. return
  93. }
  94. a.traversal.AddNodes(nodes)
  95. go func() {
  96. <-a.traversal.Stalled()
  97. a.traversal.Stop()
  98. <-a.traversal.Stopped()
  99. if a.announcePeerOpts != nil {
  100. a.announceClosest()
  101. }
  102. a.peerAnnounced.Set()
  103. close(a.Peers)
  104. }()
  105. return a, nil
  106. }
  107. func (a *Announce) announceClosest() {
  108. var wg sync.WaitGroup
  109. a.traversal.Closest().Range(func(elem dhtutil.Elem) {
  110. wg.Add(1)
  111. go func() {
  112. a.logger().Levelf(log.Debug,
  113. "announce_peer to %v: %v",
  114. elem, a.announcePeer(elem),
  115. )
  116. wg.Done()
  117. }()
  118. })
  119. wg.Wait()
  120. }
  121. func (a *Announce) announcePeer(peer dhtutil.Elem) error {
  122. ctx, cancel := context.WithCancel(context.Background())
  123. defer cancel()
  124. go func() {
  125. select {
  126. case <-a.closed.Done():
  127. cancel()
  128. case <-ctx.Done():
  129. }
  130. }()
  131. return a.server.announcePeer(
  132. ctx,
  133. NewAddr(peer.Addr.UDP()),
  134. a.infoHash,
  135. a.announcePeerOpts.Port,
  136. peer.Data.(string),
  137. a.announcePeerOpts.ImpliedPort,
  138. QueryRateLimiting{},
  139. ).Err
  140. }
  141. func (a *Announce) getPeers(ctx context.Context, addr krpc.NodeAddr) (tqr traversal.QueryResult) {
  142. res := a.server.GetPeers(ctx, NewAddr(addr.UDP()), a.infoHash, a.scrape, QueryRateLimiting{})
  143. if r := res.Reply.R; r != nil {
  144. peersValues := PeersValues{
  145. Peers: r.Values,
  146. NodeInfo: krpc.NodeInfo{
  147. Addr: addr,
  148. ID: r.ID,
  149. },
  150. Return: *r,
  151. }
  152. select {
  153. case a.Peers <- peersValues:
  154. case <-a.traversal.Stopped():
  155. }
  156. if r.Token != nil {
  157. tqr.ClosestData = *r.Token
  158. tqr.ResponseFrom = &krpc.NodeInfo{
  159. ID: r.ID,
  160. Addr: addr,
  161. }
  162. }
  163. tqr.Nodes = r.Nodes
  164. tqr.Nodes6 = r.Nodes6
  165. }
  166. return
  167. }
  168. // Corresponds to the "values" key in a get_peers KRPC response. A list of
  169. // peers that a node has reported as being in the swarm for a queried info
  170. // hash.
  171. type PeersValues struct {
  172. Peers []Peer // Peers given in get_peers response.
  173. krpc.NodeInfo // The node that gave the response.
  174. krpc.Return
  175. }
  176. // Stop the announce.
  177. func (a *Announce) Close() {
  178. a.StopTraversing()
  179. // This will prevent peer announces from proceeding.
  180. a.closed.Set()
  181. }
  182. func (a *Announce) logger() log.Logger {
  183. return a.server.logger()
  184. }
  185. // Halts traversal, but won't block peer announcing.
  186. func (a *Announce) StopTraversing() {
  187. a.traversal.Stop()
  188. }
  189. // Traversal and peer announcing steps are done.
  190. func (a *Announce) Finished() events.Done {
  191. // This is the last step in an announce.
  192. return a.peerAnnounced.Done()
  193. }