| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package torrent
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "net"
- "net/url"
- "sync"
- "time"
- "github.com/anacrolix/dht/v2/krpc"
- "github.com/anacrolix/log"
- "github.com/anacrolix/torrent/tracker"
- )
- // Announces a torrent to a tracker at regular intervals, when peers are
- // required.
- type trackerScraper struct {
- shortInfohash [20]byte
- u url.URL
- t *Torrent
- lastAnnounce trackerAnnounceResult
- lookupTrackerIp func(*url.URL) ([]net.IP, error)
- stopOnce sync.Once
- stopCh chan struct{}
- }
- type torrentTrackerAnnouncer interface {
- statusLine() string
- URL() *url.URL
- Stop()
- }
- func (me trackerScraper) URL() *url.URL {
- return &me.u
- }
- func (ts *trackerScraper) statusLine() string {
- var w bytes.Buffer
- fmt.Fprintf(&w, "next ann: %v, last ann: %v",
- func() string {
- na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
- if na > 0 {
- na /= time.Second
- na *= time.Second
- return na.String()
- } else {
- return "anytime"
- }
- }(),
- func() string {
- if ts.lastAnnounce.Err != nil {
- return ts.lastAnnounce.Err.Error()
- }
- if ts.lastAnnounce.Completed.IsZero() {
- return "never"
- }
- return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
- }(),
- )
- return w.String()
- }
- type trackerAnnounceResult struct {
- Err error
- NumPeers int
- Interval time.Duration
- Completed time.Time
- }
- func (me *trackerScraper) getIp() (ip net.IP, err error) {
- var ips []net.IP
- if me.lookupTrackerIp != nil {
- ips, err = me.lookupTrackerIp(&me.u)
- } else {
- // Do a regular dns lookup
- ips, err = net.LookupIP(me.u.Hostname())
- }
- if err != nil {
- return
- }
- if len(ips) == 0 {
- err = errors.New("no ips")
- return
- }
- me.t.cl.rLock()
- defer me.t.cl.rUnlock()
- if me.t.cl.closed.IsSet() {
- err = errors.New("client is closed")
- return
- }
- for _, ip = range ips {
- if me.t.cl.ipIsBlocked(ip) {
- continue
- }
- switch me.u.Scheme {
- case "udp4":
- if ip.To4() == nil {
- continue
- }
- case "udp6":
- if ip.To4() != nil {
- continue
- }
- }
- return
- }
- err = errors.New("no acceptable ips")
- return
- }
- func (me *trackerScraper) trackerUrl(ip net.IP) string {
- u := me.u
- if u.Port() != "" {
- u.Host = net.JoinHostPort(ip.String(), u.Port())
- }
- return u.String()
- }
- // Return how long to wait before trying again. For most errors, we return 5
- // minutes, a relatively quick turn around for DNS changes.
- func (me *trackerScraper) announce(
- ctx context.Context,
- event tracker.AnnounceEvent,
- ) (ret trackerAnnounceResult) {
- defer func() {
- ret.Completed = time.Now()
- }()
- ret.Interval = time.Minute
- // Limit concurrent use of the same tracker URL by the Client.
- ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
- defer ref.Drop()
- select {
- case <-ctx.Done():
- ret.Err = ctx.Err()
- return
- case ref.C() <- struct{}{}:
- }
- defer func() {
- select {
- case <-ref.C():
- default:
- panic("should return immediately")
- }
- }()
- ip, err := me.getIp()
- if err != nil {
- ret.Err = fmt.Errorf("error getting ip: %s", err)
- return
- }
- me.t.cl.rLock()
- req := me.t.announceRequest(event, me.shortInfohash)
- me.t.cl.rUnlock()
- // The default timeout works well as backpressure on concurrent access to the tracker. Since
- // we're passing our own Context now, we will include that timeout ourselves to maintain similar
- // behavior to previously, albeit with this context now being cancelled when the Torrent is
- // closed.
- ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
- defer cancel()
- me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
- res, err := tracker.Announce{
- Context: ctx,
- HttpProxy: me.t.cl.config.HTTPProxy,
- HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
- DialContext: me.t.cl.config.TrackerDialContext,
- ListenPacket: me.t.cl.config.TrackerListenPacket,
- UserAgent: me.t.cl.config.HTTPUserAgent,
- TrackerUrl: me.trackerUrl(ip),
- Request: req,
- HostHeader: me.u.Host,
- ServerName: me.u.Hostname(),
- UdpNetwork: me.u.Scheme,
- ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
- ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
- Logger: me.t.logger,
- }.Do()
- me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
- if err != nil {
- ret.Err = fmt.Errorf("announcing: %w", err)
- return
- }
- me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
- ret.NumPeers = len(res.Peers)
- ret.Interval = time.Duration(res.Interval) * time.Second
- return
- }
- // Returns whether we can shorten the interval, and sets notify to a channel that receives when we
- // might change our mind, or leaves it if we won't.
- func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
- gotInfo := me.t.GotInfo()
- select {
- case <-gotInfo:
- // Private trackers really don't like us announcing more than they specify. They're also
- // tracking us very carefully, so it's best to comply.
- private := me.t.info.Private
- return private == nil || !*private
- default:
- *notify = gotInfo
- return false
- }
- }
- func (me *trackerScraper) Stop() {
- me.stopOnce.Do(func() {
- close(me.stopCh)
- })
- }
- func (me *trackerScraper) Run() {
- defer me.announceStopped()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go func() {
- defer cancel()
- select {
- case <-ctx.Done():
- case <-me.t.Closed():
- }
- }()
- // make sure first announce is a "started"
- e := tracker.Started
- for {
- ar := me.announce(ctx, e)
- // after first announce, get back to regular "none"
- e = tracker.None
- me.t.cl.lock()
- me.lastAnnounce = ar
- me.t.cl.unlock()
- recalculate:
- // Make sure we don't announce for at least a minute since the last one.
- interval := ar.Interval
- if interval < time.Minute {
- interval = time.Minute
- }
- me.t.cl.lock()
- wantPeers := me.t.wantPeersEvent.C()
- me.t.cl.unlock()
- // If we want peers, reduce the interval to the minimum if it's appropriate.
- // A channel that receives when we should reconsider our interval. Starts as nil since that
- // never receives.
- var reconsider <-chan struct{}
- select {
- case <-wantPeers:
- if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
- interval = time.Minute
- }
- default:
- reconsider = wantPeers
- }
- select {
- case <-me.stopCh:
- return
- case <-me.t.closed.Done():
- return
- case <-reconsider:
- // Recalculate the interval.
- goto recalculate
- case <-time.After(time.Until(ar.Completed.Add(interval))):
- }
- }
- }
- func (me *trackerScraper) announceStopped() {
- ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
- defer cancel()
- me.announce(ctx, tracker.Stopped)
- }
|