client.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package webseed
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net/http"
  10. "strings"
  11. "github.com/RoaringBitmap/roaring"
  12. "github.com/anacrolix/torrent/common"
  13. "github.com/anacrolix/torrent/metainfo"
  14. "github.com/anacrolix/torrent/segments"
  15. )
  16. type RequestSpec = segments.Extent
  17. type requestPart struct {
  18. req *http.Request
  19. e segments.Extent
  20. do func() (*http.Response, error)
  21. // Wrap http response bodies for such things as download rate limiting.
  22. responseBodyWrapper ResponseBodyWrapper
  23. }
  24. type Request struct {
  25. cancel func()
  26. Result chan RequestResult
  27. }
  28. func (r Request) Cancel() {
  29. r.cancel()
  30. }
  31. type Client struct {
  32. HttpClient *http.Client
  33. Url string
  34. fileIndex segments.Index
  35. info *metainfo.Info
  36. // The pieces we can request with the Url. We're more likely to ban/block at the file-level
  37. // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
  38. // level. We can map our file-level adjustments to the pieces here. This probably need to be
  39. // private in the future, if Client ever starts removing pieces.
  40. Pieces roaring.Bitmap
  41. ResponseBodyWrapper ResponseBodyWrapper
  42. PathEscaper PathEscaper
  43. }
  44. type ResponseBodyWrapper func(io.Reader) io.Reader
  45. func (me *Client) SetInfo(info *metainfo.Info) {
  46. if !strings.HasSuffix(me.Url, "/") && info.IsDir() {
  47. // In my experience, this is a non-conforming webseed. For example the
  48. // http://ia600500.us.archive.org/1/items URLs in archive.org torrents.
  49. return
  50. }
  51. me.fileIndex = segments.NewIndexFromSegments(common.TorrentOffsetFileSegments(info))
  52. me.info = info
  53. me.Pieces.AddRange(0, uint64(info.NumPieces()))
  54. }
  55. type RequestResult struct {
  56. Bytes []byte
  57. Err error
  58. }
  59. func (ws *Client) StartNewRequest(r RequestSpec) Request {
  60. ctx, cancel := context.WithCancel(context.TODO())
  61. var requestParts []requestPart
  62. if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
  63. req, err := newRequest(
  64. ctx,
  65. ws.Url, i, ws.info, e.Start, e.Length,
  66. ws.PathEscaper,
  67. )
  68. if err != nil {
  69. panic(err)
  70. }
  71. part := requestPart{
  72. req: req,
  73. e: e,
  74. responseBodyWrapper: ws.ResponseBodyWrapper,
  75. }
  76. part.do = func() (*http.Response, error) {
  77. return ws.HttpClient.Do(req)
  78. }
  79. requestParts = append(requestParts, part)
  80. return true
  81. }) {
  82. panic("request out of file bounds")
  83. }
  84. req := Request{
  85. cancel: cancel,
  86. Result: make(chan RequestResult, 1),
  87. }
  88. go func() {
  89. b, err := readRequestPartResponses(ctx, requestParts)
  90. req.Result <- RequestResult{
  91. Bytes: b,
  92. Err: err,
  93. }
  94. }()
  95. return req
  96. }
  97. type ErrBadResponse struct {
  98. Msg string
  99. Response *http.Response
  100. }
  101. func (me ErrBadResponse) Error() string {
  102. return me.Msg
  103. }
  104. func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error {
  105. defer resp.Body.Close()
  106. var body io.Reader = resp.Body
  107. if part.responseBodyWrapper != nil {
  108. body = part.responseBodyWrapper(body)
  109. }
  110. // Prevent further accidental use
  111. resp.Body = nil
  112. if ctx.Err() != nil {
  113. return ctx.Err()
  114. }
  115. switch resp.StatusCode {
  116. case http.StatusPartialContent:
  117. copied, err := io.Copy(buf, body)
  118. if err != nil {
  119. return err
  120. }
  121. if copied != part.e.Length {
  122. return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
  123. }
  124. return nil
  125. case http.StatusOK:
  126. // This number is based on
  127. // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that
  128. // archive.org might be using a webserver implementation that refuses to do partial
  129. // responses to small files.
  130. if part.e.Start < 48<<10 {
  131. if part.e.Start != 0 {
  132. log.Printf("resp status ok but requested range [url=%q, range=%q]",
  133. part.req.URL,
  134. part.req.Header.Get("Range"))
  135. }
  136. // Instead of discarding, we could try receiving all the chunks present in the response
  137. // body. I don't know how one would handle multiple chunk requests resulting in an OK
  138. // response for the same file. The request algorithm might be need to be smarter for
  139. // that.
  140. discarded, _ := io.CopyN(io.Discard, body, part.e.Start)
  141. if discarded != 0 {
  142. log.Printf("discarded %v bytes in webseed request response part", discarded)
  143. }
  144. _, err := io.CopyN(buf, body, part.e.Length)
  145. return err
  146. } else {
  147. return ErrBadResponse{"resp status ok but requested range", resp}
  148. }
  149. case http.StatusServiceUnavailable:
  150. return ErrTooFast
  151. default:
  152. return ErrBadResponse{
  153. fmt.Sprintf("unhandled response status code (%v)", resp.StatusCode),
  154. resp,
  155. }
  156. }
  157. }
  158. var ErrTooFast = errors.New("making requests too fast")
  159. func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
  160. var buf bytes.Buffer
  161. for _, part := range parts {
  162. var resp *http.Response
  163. resp, err = part.do()
  164. if err == nil {
  165. err = recvPartResult(ctx, &buf, part, resp)
  166. }
  167. if err != nil {
  168. err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
  169. break
  170. }
  171. }
  172. return buf.Bytes(), err
  173. }