webseed-peer.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package torrent
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. "github.com/RoaringBitmap/roaring"
  10. "github.com/anacrolix/log"
  11. "github.com/anacrolix/torrent/metainfo"
  12. pp "github.com/anacrolix/torrent/peer_protocol"
  13. "github.com/anacrolix/torrent/webseed"
  14. )
  15. const (
  16. webseedPeerCloseOnUnhandledError = false
  17. )
  18. type webseedPeer struct {
  19. // First field for stats alignment.
  20. peer Peer
  21. client webseed.Client
  22. activeRequests map[Request]webseed.Request
  23. requesterCond sync.Cond
  24. lastUnhandledErr time.Time
  25. }
  26. var _ peerImpl = (*webseedPeer)(nil)
  27. func (me *webseedPeer) peerImplStatusLines() []string {
  28. return []string{
  29. me.client.Url,
  30. fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
  31. }
  32. }
  33. func (ws *webseedPeer) String() string {
  34. return fmt.Sprintf("webseed peer for %q", ws.client.Url)
  35. }
  36. func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
  37. ws.client.SetInfo(info)
  38. // There should be probably be a callback in Client instead, so it can remove pieces at its whim
  39. // too.
  40. ws.client.Pieces.Iterate(func(x uint32) bool {
  41. ws.peer.t.incPieceAvailability(pieceIndex(x))
  42. return true
  43. })
  44. }
  45. func (ws *webseedPeer) writeInterested(interested bool) bool {
  46. return true
  47. }
  48. func (ws *webseedPeer) _cancel(r RequestIndex) bool {
  49. if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
  50. active.Cancel()
  51. // The requester is running and will handle the result.
  52. return true
  53. }
  54. // There should be no requester handling this, so no further events will occur.
  55. return false
  56. }
  57. func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
  58. return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
  59. }
  60. func (ws *webseedPeer) _request(r Request) bool {
  61. ws.requesterCond.Signal()
  62. return true
  63. }
  64. // Returns true if we should look for another request to start. Returns false if we handled this
  65. // one.
  66. func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool {
  67. r := ws.peer.t.requestIndexToRequest(x)
  68. if _, ok := ws.activeRequests[r]; ok {
  69. return true
  70. }
  71. webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
  72. ws.activeRequests[r] = webseedRequest
  73. locker := ws.requesterCond.L
  74. err := func() error {
  75. locker.Unlock()
  76. defer locker.Lock()
  77. return ws.requestResultHandler(r, webseedRequest)
  78. }()
  79. delete(ws.activeRequests, r)
  80. if err != nil {
  81. level := log.Warning
  82. if errors.Is(err, context.Canceled) {
  83. level = log.Debug
  84. }
  85. ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err)
  86. // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
  87. // kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing around
  88. // it doesn't hurt to slow a few down if there are issues.
  89. locker.Unlock()
  90. select {
  91. case <-ws.peer.closed.Done():
  92. case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
  93. }
  94. locker.Lock()
  95. ws.peer.updateRequests("webseedPeer request errored")
  96. }
  97. return false
  98. }
  99. func (ws *webseedPeer) requester(i int) {
  100. ws.requesterCond.L.Lock()
  101. defer ws.requesterCond.L.Unlock()
  102. start:
  103. for !ws.peer.closed.IsSet() {
  104. for reqIndex := range ws.peer.requestState.Requests.Iterator() {
  105. if !ws.requestIteratorLocked(i, reqIndex) {
  106. goto start
  107. }
  108. }
  109. // Found no requests to handle, so wait.
  110. ws.requesterCond.Wait()
  111. }
  112. }
  113. func (ws *webseedPeer) connectionFlags() string {
  114. return "WS"
  115. }
  116. // Maybe this should drop all existing connections, or something like that.
  117. func (ws *webseedPeer) drop() {}
  118. func (cn *webseedPeer) ban() {
  119. cn.peer.close()
  120. }
  121. func (ws *webseedPeer) handleUpdateRequests() {
  122. // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
  123. // pieces.
  124. go func() {
  125. ws.peer.t.cl.lock()
  126. defer ws.peer.t.cl.unlock()
  127. ws.peer.maybeUpdateActualRequestState()
  128. }()
  129. }
  130. func (ws *webseedPeer) onClose() {
  131. ws.peer.logger.Levelf(log.Debug, "closing")
  132. // Just deleting them means we would have to manually cancel active requests.
  133. ws.peer.cancelAllRequests()
  134. ws.peer.t.iterPeers(func(p *Peer) {
  135. if p.isLowOnRequests() {
  136. p.updateRequests("webseedPeer.onClose")
  137. }
  138. })
  139. ws.requesterCond.Broadcast()
  140. }
  141. func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
  142. result := <-webseedRequest.Result
  143. close(webseedRequest.Result) // one-shot
  144. // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
  145. // sure if we can divine which errors indicate cancellation on our end without hitting the
  146. // network though.
  147. if len(result.Bytes) != 0 || result.Err == nil {
  148. // Increment ChunksRead and friends
  149. ws.peer.doChunkReadStats(int64(len(result.Bytes)))
  150. }
  151. ws.peer.readBytes(int64(len(result.Bytes)))
  152. ws.peer.t.cl.lock()
  153. defer ws.peer.t.cl.unlock()
  154. if ws.peer.t.closed.IsSet() {
  155. return nil
  156. }
  157. err := result.Err
  158. if err != nil {
  159. switch {
  160. case errors.Is(err, context.Canceled):
  161. case errors.Is(err, webseed.ErrTooFast):
  162. case ws.peer.closed.IsSet():
  163. default:
  164. ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
  165. // // Here lies my attempt to extract something concrete from Go's error system. RIP.
  166. // cfg := spew.NewDefaultConfig()
  167. // cfg.DisableMethods = true
  168. // cfg.Dump(result.Err)
  169. if webseedPeerCloseOnUnhandledError {
  170. log.Printf("closing %v", ws)
  171. ws.peer.close()
  172. } else {
  173. ws.lastUnhandledErr = time.Now()
  174. }
  175. }
  176. if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
  177. panic("invalid reject")
  178. }
  179. return err
  180. }
  181. err = ws.peer.receiveChunk(&pp.Message{
  182. Type: pp.Piece,
  183. Index: r.Index,
  184. Begin: r.Begin,
  185. Piece: result.Bytes,
  186. })
  187. if err != nil {
  188. panic(err)
  189. }
  190. return err
  191. }
  192. func (me *webseedPeer) peerPieces() *roaring.Bitmap {
  193. return &me.client.Pieces
  194. }
  195. func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
  196. if !cn.peer.t.haveInfo() {
  197. return true, false
  198. }
  199. return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
  200. }