tracker_scraper.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package torrent
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/url"
  9. "sync"
  10. "time"
  11. "github.com/anacrolix/dht/v2/krpc"
  12. "github.com/anacrolix/log"
  13. "github.com/anacrolix/torrent/tracker"
  14. )
  15. // Announces a torrent to a tracker at regular intervals, when peers are
  16. // required.
  17. type trackerScraper struct {
  18. shortInfohash [20]byte
  19. u url.URL
  20. t *Torrent
  21. lastAnnounce trackerAnnounceResult
  22. lookupTrackerIp func(*url.URL) ([]net.IP, error)
  23. stopOnce sync.Once
  24. stopCh chan struct{}
  25. }
  26. type torrentTrackerAnnouncer interface {
  27. statusLine() string
  28. URL() *url.URL
  29. Stop()
  30. }
  31. func (me trackerScraper) URL() *url.URL {
  32. return &me.u
  33. }
  34. func (ts *trackerScraper) statusLine() string {
  35. var w bytes.Buffer
  36. fmt.Fprintf(&w, "next ann: %v, last ann: %v",
  37. func() string {
  38. na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
  39. if na > 0 {
  40. na /= time.Second
  41. na *= time.Second
  42. return na.String()
  43. } else {
  44. return "anytime"
  45. }
  46. }(),
  47. func() string {
  48. if ts.lastAnnounce.Err != nil {
  49. return ts.lastAnnounce.Err.Error()
  50. }
  51. if ts.lastAnnounce.Completed.IsZero() {
  52. return "never"
  53. }
  54. return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
  55. }(),
  56. )
  57. return w.String()
  58. }
  59. type trackerAnnounceResult struct {
  60. Err error
  61. NumPeers int
  62. Interval time.Duration
  63. Completed time.Time
  64. }
  65. func (me *trackerScraper) getIp() (ip net.IP, err error) {
  66. var ips []net.IP
  67. if me.lookupTrackerIp != nil {
  68. ips, err = me.lookupTrackerIp(&me.u)
  69. } else {
  70. // Do a regular dns lookup
  71. ips, err = net.LookupIP(me.u.Hostname())
  72. }
  73. if err != nil {
  74. return
  75. }
  76. if len(ips) == 0 {
  77. err = errors.New("no ips")
  78. return
  79. }
  80. me.t.cl.rLock()
  81. defer me.t.cl.rUnlock()
  82. if me.t.cl.closed.IsSet() {
  83. err = errors.New("client is closed")
  84. return
  85. }
  86. for _, ip = range ips {
  87. if me.t.cl.ipIsBlocked(ip) {
  88. continue
  89. }
  90. switch me.u.Scheme {
  91. case "udp4":
  92. if ip.To4() == nil {
  93. continue
  94. }
  95. case "udp6":
  96. if ip.To4() != nil {
  97. continue
  98. }
  99. }
  100. return
  101. }
  102. err = errors.New("no acceptable ips")
  103. return
  104. }
  105. func (me *trackerScraper) trackerUrl(ip net.IP) string {
  106. u := me.u
  107. if u.Port() != "" {
  108. u.Host = net.JoinHostPort(ip.String(), u.Port())
  109. }
  110. return u.String()
  111. }
  112. // Return how long to wait before trying again. For most errors, we return 5
  113. // minutes, a relatively quick turn around for DNS changes.
  114. func (me *trackerScraper) announce(
  115. ctx context.Context,
  116. event tracker.AnnounceEvent,
  117. ) (ret trackerAnnounceResult) {
  118. defer func() {
  119. ret.Completed = time.Now()
  120. }()
  121. ret.Interval = time.Minute
  122. // Limit concurrent use of the same tracker URL by the Client.
  123. ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
  124. defer ref.Drop()
  125. select {
  126. case <-ctx.Done():
  127. ret.Err = ctx.Err()
  128. return
  129. case ref.C() <- struct{}{}:
  130. }
  131. defer func() {
  132. select {
  133. case <-ref.C():
  134. default:
  135. panic("should return immediately")
  136. }
  137. }()
  138. ip, err := me.getIp()
  139. if err != nil {
  140. ret.Err = fmt.Errorf("error getting ip: %s", err)
  141. return
  142. }
  143. me.t.cl.rLock()
  144. req := me.t.announceRequest(event, me.shortInfohash)
  145. me.t.cl.rUnlock()
  146. // The default timeout works well as backpressure on concurrent access to the tracker. Since
  147. // we're passing our own Context now, we will include that timeout ourselves to maintain similar
  148. // behavior to previously, albeit with this context now being cancelled when the Torrent is
  149. // closed.
  150. ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
  151. defer cancel()
  152. me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
  153. res, err := tracker.Announce{
  154. Context: ctx,
  155. HttpProxy: me.t.cl.config.HTTPProxy,
  156. HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
  157. DialContext: me.t.cl.config.TrackerDialContext,
  158. ListenPacket: me.t.cl.config.TrackerListenPacket,
  159. UserAgent: me.t.cl.config.HTTPUserAgent,
  160. TrackerUrl: me.trackerUrl(ip),
  161. Request: req,
  162. HostHeader: me.u.Host,
  163. ServerName: me.u.Hostname(),
  164. UdpNetwork: me.u.Scheme,
  165. ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
  166. ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
  167. Logger: me.t.logger,
  168. }.Do()
  169. me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
  170. if err != nil {
  171. ret.Err = fmt.Errorf("announcing: %w", err)
  172. return
  173. }
  174. me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
  175. ret.NumPeers = len(res.Peers)
  176. ret.Interval = time.Duration(res.Interval) * time.Second
  177. return
  178. }
  179. // Returns whether we can shorten the interval, and sets notify to a channel that receives when we
  180. // might change our mind, or leaves it if we won't.
  181. func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
  182. gotInfo := me.t.GotInfo()
  183. select {
  184. case <-gotInfo:
  185. // Private trackers really don't like us announcing more than they specify. They're also
  186. // tracking us very carefully, so it's best to comply.
  187. private := me.t.info.Private
  188. return private == nil || !*private
  189. default:
  190. *notify = gotInfo
  191. return false
  192. }
  193. }
  194. func (me *trackerScraper) Stop() {
  195. me.stopOnce.Do(func() {
  196. close(me.stopCh)
  197. })
  198. }
  199. func (me *trackerScraper) Run() {
  200. defer me.announceStopped()
  201. ctx, cancel := context.WithCancel(context.Background())
  202. defer cancel()
  203. go func() {
  204. defer cancel()
  205. select {
  206. case <-ctx.Done():
  207. case <-me.t.Closed():
  208. }
  209. }()
  210. // make sure first announce is a "started"
  211. e := tracker.Started
  212. for {
  213. ar := me.announce(ctx, e)
  214. // after first announce, get back to regular "none"
  215. e = tracker.None
  216. me.t.cl.lock()
  217. me.lastAnnounce = ar
  218. me.t.cl.unlock()
  219. recalculate:
  220. // Make sure we don't announce for at least a minute since the last one.
  221. interval := ar.Interval
  222. if interval < time.Minute {
  223. interval = time.Minute
  224. }
  225. me.t.cl.lock()
  226. wantPeers := me.t.wantPeersEvent.C()
  227. me.t.cl.unlock()
  228. // If we want peers, reduce the interval to the minimum if it's appropriate.
  229. // A channel that receives when we should reconsider our interval. Starts as nil since that
  230. // never receives.
  231. var reconsider <-chan struct{}
  232. select {
  233. case <-wantPeers:
  234. if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
  235. interval = time.Minute
  236. }
  237. default:
  238. reconsider = wantPeers
  239. }
  240. select {
  241. case <-me.stopCh:
  242. return
  243. case <-me.t.closed.Done():
  244. return
  245. case <-reconsider:
  246. // Recalculate the interval.
  247. goto recalculate
  248. case <-time.After(time.Until(ar.Completed.Add(interval))):
  249. }
  250. }
  251. }
  252. func (me *trackerScraper) announceStopped() {
  253. ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
  254. defer cancel()
  255. me.announce(ctx, tracker.Stopped)
  256. }