reader.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package torrent
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "github.com/anacrolix/log"
  9. "github.com/anacrolix/missinggo/v2"
  10. )
  11. // Accesses Torrent data via a Client. Reads block until the data is available. Seeks and readahead
  12. // also drive Client behaviour. Not safe for concurrent use.
  13. type Reader interface {
  14. io.ReadSeekCloser
  15. missinggo.ReadContexter
  16. // Configure the number of bytes ahead of a read that should also be prioritized in preparation
  17. // for further reads. Overridden by non-nil readahead func, see SetReadaheadFunc.
  18. SetReadahead(int64)
  19. // If non-nil, the provided function is called when the implementation needs to know the
  20. // readahead for the current reader. Calls occur during Reads and Seeks, and while the Client is
  21. // locked.
  22. SetReadaheadFunc(ReadaheadFunc)
  23. // Don't wait for pieces to complete and be verified. Read calls return as soon as they can when
  24. // the underlying chunks become available.
  25. SetResponsive()
  26. }
  27. // Piece range by piece index, [begin, end).
  28. type pieceRange struct {
  29. begin, end pieceIndex
  30. }
  31. type ReadaheadContext struct {
  32. ContiguousReadStartPos int64
  33. CurrentPos int64
  34. }
  35. // Returns the desired readahead for a Reader.
  36. type ReadaheadFunc func(ReadaheadContext) int64
  37. type reader struct {
  38. t *Torrent
  39. // Adjust the read/seek window to handle Readers locked to File extents and the like.
  40. offset, length int64
  41. // Function to dynamically calculate readahead. If nil, readahead is static.
  42. readaheadFunc ReadaheadFunc
  43. // Required when modifying pos and readahead.
  44. mu sync.Locker
  45. readahead, pos int64
  46. // Position that reads have continued contiguously from.
  47. contiguousReadStartPos int64
  48. // The cached piece range this reader wants downloaded. The zero value corresponds to nothing.
  49. // We cache this so that changes can be detected, and bubbled up to the Torrent only as
  50. // required.
  51. pieces pieceRange
  52. // Reads have been initiated since the last seek. This is used to prevent readaheads occurring
  53. // after a seek or with a new reader at the starting position.
  54. reading bool
  55. responsive bool
  56. }
  57. var _ io.ReadSeekCloser = (*reader)(nil)
  58. func (r *reader) SetResponsive() {
  59. r.responsive = true
  60. r.t.cl.event.Broadcast()
  61. }
  62. // Disable responsive mode. TODO: Remove?
  63. func (r *reader) SetNonResponsive() {
  64. r.responsive = false
  65. r.t.cl.event.Broadcast()
  66. }
  67. func (r *reader) SetReadahead(readahead int64) {
  68. r.mu.Lock()
  69. r.readahead = readahead
  70. r.readaheadFunc = nil
  71. r.posChanged()
  72. r.mu.Unlock()
  73. }
  74. func (r *reader) SetReadaheadFunc(f ReadaheadFunc) {
  75. r.mu.Lock()
  76. r.readaheadFunc = f
  77. r.posChanged()
  78. r.mu.Unlock()
  79. }
  80. // How many bytes are available to read. Max is the most we could require.
  81. func (r *reader) available(off, max int64) (ret int64) {
  82. off += r.offset
  83. for max > 0 {
  84. req, ok := r.t.offsetRequest(off)
  85. if !ok {
  86. break
  87. }
  88. if !r.responsive && !r.t.pieceComplete(pieceIndex(req.Index)) {
  89. break
  90. }
  91. if !r.t.haveChunk(req) {
  92. break
  93. }
  94. len1 := int64(req.Length) - (off - r.t.requestOffset(req))
  95. max -= len1
  96. ret += len1
  97. off += len1
  98. }
  99. // Ensure that ret hasn't exceeded our original max.
  100. if max < 0 {
  101. ret += max
  102. }
  103. return
  104. }
  105. // Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces.
  106. func (r *reader) piecesUncached() (ret pieceRange) {
  107. ra := r.readahead
  108. if r.readaheadFunc != nil {
  109. ra = r.readaheadFunc(ReadaheadContext{
  110. ContiguousReadStartPos: r.contiguousReadStartPos,
  111. CurrentPos: r.pos,
  112. })
  113. }
  114. if ra < 1 {
  115. // Needs to be at least 1, because [x, x) means we don't want
  116. // anything.
  117. ra = 1
  118. }
  119. if !r.reading {
  120. ra = 0
  121. }
  122. if ra > r.length-r.pos {
  123. ra = r.length - r.pos
  124. }
  125. ret.begin, ret.end = r.t.byteRegionPieces(r.torrentOffset(r.pos), ra)
  126. return
  127. }
  128. func (r *reader) Read(b []byte) (n int, err error) {
  129. return r.ReadContext(context.Background(), b)
  130. }
  131. func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
  132. if len(b) > 0 {
  133. r.reading = true
  134. // TODO: Rework reader piece priorities so we don't have to push updates in to the Client
  135. // and take the lock here.
  136. r.mu.Lock()
  137. r.posChanged()
  138. r.mu.Unlock()
  139. }
  140. n, err = r.readOnceAt(ctx, b, r.pos)
  141. if n == 0 {
  142. if err == nil && len(b) > 0 {
  143. panic("expected error")
  144. } else {
  145. return
  146. }
  147. }
  148. r.mu.Lock()
  149. r.pos += int64(n)
  150. r.posChanged()
  151. r.mu.Unlock()
  152. if r.pos >= r.length {
  153. err = io.EOF
  154. } else if err == io.EOF {
  155. err = io.ErrUnexpectedEOF
  156. }
  157. return
  158. }
  159. var closedChan = make(chan struct{})
  160. func init() {
  161. close(closedChan)
  162. }
  163. // Wait until some data should be available to read. Tickles the client if it isn't. Returns how
  164. // much should be readable without blocking.
  165. func (r *reader) waitAvailable(ctx context.Context, pos, wanted int64, wait bool) (avail int64, err error) {
  166. t := r.t
  167. for {
  168. r.t.cl.rLock()
  169. avail = r.available(pos, wanted)
  170. readerCond := t.piece(int((r.offset + pos) / t.info.PieceLength)).readerCond.Signaled()
  171. r.t.cl.rUnlock()
  172. if avail != 0 {
  173. return
  174. }
  175. var dontWait <-chan struct{}
  176. if !wait || wanted == 0 {
  177. dontWait = closedChan
  178. }
  179. select {
  180. case <-r.t.closed.Done():
  181. err = errors.New("torrent closed")
  182. return
  183. case <-ctx.Done():
  184. err = ctx.Err()
  185. return
  186. case <-r.t.dataDownloadDisallowed.On():
  187. err = errors.New("torrent data downloading disabled")
  188. case <-r.t.networkingEnabled.Off():
  189. err = errors.New("torrent networking disabled")
  190. return
  191. case <-dontWait:
  192. return
  193. case <-readerCond:
  194. }
  195. }
  196. }
  197. // Adds the reader's torrent offset to the reader object offset (for example the reader might be
  198. // constrainted to a particular file within the torrent).
  199. func (r *reader) torrentOffset(readerPos int64) int64 {
  200. return r.offset + readerPos
  201. }
  202. // Performs at most one successful read to torrent storage.
  203. func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
  204. if pos >= r.length {
  205. err = io.EOF
  206. return
  207. }
  208. for {
  209. var avail int64
  210. avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
  211. if avail == 0 {
  212. return
  213. }
  214. firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
  215. firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength
  216. b1 := missinggo.LimitLen(b, avail)
  217. n, err = r.t.readAt(b1, r.torrentOffset(pos))
  218. if n != 0 {
  219. err = nil
  220. return
  221. }
  222. if r.t.closed.IsSet() {
  223. err = fmt.Errorf("reading from closed torrent: %w", err)
  224. return
  225. }
  226. r.t.cl.lock()
  227. // I think there's a panic here caused by the Client being closed before obtaining this
  228. // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
  229. func() {
  230. // Just add exceptions already.
  231. defer r.t.cl.unlock()
  232. if r.t.closed.IsSet() {
  233. // Can't update because Torrent's piece order is removed from Client.
  234. return
  235. }
  236. // TODO: Just reset pieces in the readahead window. This might help
  237. // prevent thrashing with small caches and file and piece priorities.
  238. r.log(log.Fstr("error reading piece %d offset %d, %d bytes: %v",
  239. firstPieceIndex, firstPieceOffset, len(b1), err))
  240. if !r.t.updatePieceCompletion(firstPieceIndex) {
  241. r.log(log.Fstr("piece %d completion unchanged", firstPieceIndex))
  242. }
  243. // Update the rest of the piece completions in the readahead window, without alerting to
  244. // changes (since only the first piece, the one above, could have generated the read error
  245. // we're currently handling).
  246. if r.pieces.begin != firstPieceIndex {
  247. panic(fmt.Sprint(r.pieces.begin, firstPieceIndex))
  248. }
  249. for index := r.pieces.begin + 1; index < r.pieces.end; index++ {
  250. r.t.updatePieceCompletion(index)
  251. }
  252. }()
  253. }
  254. }
  255. // Hodor
  256. func (r *reader) Close() error {
  257. r.t.cl.lock()
  258. r.t.deleteReader(r)
  259. r.t.cl.unlock()
  260. return nil
  261. }
  262. func (r *reader) posChanged() {
  263. to := r.piecesUncached()
  264. from := r.pieces
  265. if to == from {
  266. return
  267. }
  268. r.pieces = to
  269. // log.Printf("reader pos changed %v->%v", from, to)
  270. r.t.readerPosChanged(from, to)
  271. }
  272. func (r *reader) Seek(off int64, whence int) (newPos int64, err error) {
  273. switch whence {
  274. case io.SeekStart:
  275. newPos = off
  276. r.mu.Lock()
  277. case io.SeekCurrent:
  278. r.mu.Lock()
  279. newPos = r.pos + off
  280. case io.SeekEnd:
  281. newPos = r.length + off
  282. r.mu.Lock()
  283. default:
  284. return 0, errors.New("bad whence")
  285. }
  286. if newPos != r.pos {
  287. r.reading = false
  288. r.pos = newPos
  289. r.contiguousReadStartPos = newPos
  290. r.posChanged()
  291. }
  292. r.mu.Unlock()
  293. return
  294. }
  295. func (r *reader) log(m log.Msg) {
  296. r.t.logger.LogLevel(log.Debug, m.Skip(1))
  297. }
  298. // Implementation inspired by https://news.ycombinator.com/item?id=27019613.
  299. func defaultReadaheadFunc(r ReadaheadContext) int64 {
  300. return r.CurrentPos - r.ContiguousReadStartPos
  301. }