piece-resource.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package storage
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "io"
  8. "path"
  9. "sort"
  10. "strconv"
  11. g "github.com/anacrolix/generics"
  12. "github.com/anacrolix/missinggo/v2/resource"
  13. "github.com/anacrolix/sync"
  14. "github.com/anacrolix/torrent/metainfo"
  15. )
  16. type piecePerResource struct {
  17. rp PieceProvider
  18. opts ResourcePiecesOpts
  19. }
  20. type ResourcePiecesOpts struct {
  21. // After marking a piece complete, don't bother deleting its incomplete blobs.
  22. LeaveIncompleteChunks bool
  23. // Sized puts require being able to stream from a statement executed on another connection.
  24. // Without them, we buffer the entire read and then put that.
  25. NoSizedPuts bool
  26. Capacity *int64
  27. }
  28. func NewResourcePieces(p PieceProvider) ClientImpl {
  29. return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
  30. }
  31. func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
  32. return &piecePerResource{
  33. rp: p,
  34. opts: opts,
  35. }
  36. }
  37. type piecePerResourceTorrentImpl struct {
  38. piecePerResource
  39. locks []sync.RWMutex
  40. }
  41. func (piecePerResourceTorrentImpl) Close() error {
  42. return nil
  43. }
  44. func (s piecePerResource) OpenTorrent(
  45. ctx context.Context,
  46. info *metainfo.Info,
  47. infoHash metainfo.Hash,
  48. ) (TorrentImpl, error) {
  49. t := piecePerResourceTorrentImpl{
  50. s,
  51. make([]sync.RWMutex, info.NumPieces()),
  52. }
  53. return TorrentImpl{PieceWithHash: t.Piece, Close: t.Close}, nil
  54. }
  55. func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl {
  56. return piecePerResourcePiece{
  57. mp: p,
  58. pieceHash: pieceHash,
  59. piecePerResource: s.piecePerResource,
  60. mu: &s.locks[p.Index()],
  61. }
  62. }
  63. type PieceProvider interface {
  64. resource.Provider
  65. }
  66. type MovePrefixer interface {
  67. MovePrefix(old, new string) error
  68. }
  69. type ConsecutiveChunkReader interface {
  70. ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
  71. }
  72. type PrefixDeleter interface {
  73. DeletePrefix(prefix string) error
  74. }
  75. type piecePerResourcePiece struct {
  76. mp metainfo.Piece
  77. // The piece hash if we have it. It could be 20 or 32 bytes depending on the info version.
  78. pieceHash g.Option[[]byte]
  79. piecePerResource
  80. // This protects operations that move complete/incomplete pieces around, which can trigger read
  81. // errors that may cause callers to do more drastic things.
  82. mu *sync.RWMutex
  83. }
  84. var _ io.WriterTo = piecePerResourcePiece{}
  85. func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
  86. s.mu.RLock()
  87. defer s.mu.RUnlock()
  88. if s.mustIsComplete() {
  89. if s.hasMovePrefix() {
  90. if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
  91. return s.writeConsecutiveChunks(ccr, s.completedDirPath(), w)
  92. }
  93. }
  94. r, err := s.completedInstance().Get()
  95. if err != nil {
  96. return 0, fmt.Errorf("getting complete instance: %w", err)
  97. }
  98. defer r.Close()
  99. return io.Copy(w, r)
  100. }
  101. if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
  102. return s.writeConsecutiveChunks(ccr, s.incompleteDirPath(), w)
  103. }
  104. return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
  105. }
  106. func (s piecePerResourcePiece) writeConsecutiveChunks(
  107. ccw ConsecutiveChunkReader,
  108. dir string,
  109. w io.Writer,
  110. ) (int64, error) {
  111. r, err := ccw.ReadConsecutiveChunks(dir + "/")
  112. if err != nil {
  113. return 0, err
  114. }
  115. defer r.Close()
  116. return io.Copy(w, r)
  117. }
  118. // Returns if the piece is complete. Ok should be true, because we are the definitive source of
  119. // truth here.
  120. func (s piecePerResourcePiece) mustIsComplete() bool {
  121. completion := s.Completion()
  122. if !completion.Ok {
  123. panic("must know complete definitively")
  124. }
  125. return completion.Complete
  126. }
  127. func (s piecePerResourcePiece) Completion() (_ Completion) {
  128. if !s.pieceHash.Ok {
  129. return
  130. }
  131. s.mu.RLock()
  132. defer s.mu.RUnlock()
  133. fi, err := s.completedInstance().Stat()
  134. if s.hasMovePrefix() {
  135. return Completion{
  136. Complete: err == nil && fi.Size() != 0,
  137. Ok: true,
  138. }
  139. }
  140. return Completion{
  141. Complete: err == nil && fi.Size() == s.mp.Length(),
  142. Ok: true,
  143. }
  144. }
  145. type SizedPutter interface {
  146. PutSized(io.Reader, int64) error
  147. }
  148. func (s piecePerResourcePiece) MarkComplete() (err error) {
  149. s.mu.Lock()
  150. defer s.mu.Unlock()
  151. if mp, ok := s.rp.(MovePrefixer); ok {
  152. err = mp.MovePrefix(s.incompleteDirPath()+"/", s.completedDirPath()+"/")
  153. if err != nil {
  154. err = fmt.Errorf("moving incomplete to complete: %w", err)
  155. }
  156. return
  157. }
  158. incompleteChunks := s.getChunks(s.incompleteDirPath())
  159. r, err := func() (io.ReadCloser, error) {
  160. if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
  161. return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
  162. }
  163. return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
  164. }()
  165. if err != nil {
  166. return fmt.Errorf("getting incomplete chunks reader: %w", err)
  167. }
  168. defer r.Close()
  169. completedInstance := s.completedInstance()
  170. err = func() error {
  171. if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
  172. return sp.PutSized(r, s.mp.Length())
  173. } else {
  174. return completedInstance.Put(r)
  175. }
  176. }()
  177. if err != nil || s.opts.LeaveIncompleteChunks {
  178. return
  179. }
  180. // I think we do this synchronously here since we don't want callers to act on the completed
  181. // piece if we're concurrently still deleting chunks. The caller may decide to start
  182. // downloading chunks again and won't expect us to delete them. It seems to be much faster
  183. // to let the resource provider do this if possible.
  184. if pd, ok := s.rp.(PrefixDeleter); ok {
  185. err = pd.DeletePrefix(s.incompleteDirPath() + "/")
  186. if err != nil {
  187. err = fmt.Errorf("deleting incomplete prefix: %w", err)
  188. }
  189. } else {
  190. var wg sync.WaitGroup
  191. for _, c := range incompleteChunks {
  192. wg.Add(1)
  193. go func(c chunk) {
  194. defer wg.Done()
  195. c.instance.Delete()
  196. }(c)
  197. }
  198. wg.Wait()
  199. }
  200. return err
  201. }
  202. func (s piecePerResourcePiece) MarkNotComplete() error {
  203. s.mu.Lock()
  204. defer s.mu.Unlock()
  205. return s.completedInstance().Delete()
  206. }
  207. func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
  208. s.mu.RLock()
  209. defer s.mu.RUnlock()
  210. if s.mustIsComplete() {
  211. if s.hasMovePrefix() {
  212. chunks := s.getChunks(s.completedDirPath())
  213. return chunks.ReadAt(b, off)
  214. }
  215. return s.completedInstance().ReadAt(b, off)
  216. }
  217. return s.getChunks(s.incompleteDirPath()).ReadAt(b, off)
  218. }
  219. func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
  220. s.mu.RLock()
  221. defer s.mu.RUnlock()
  222. i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
  223. if err != nil {
  224. panic(err)
  225. }
  226. r := bytes.NewReader(b)
  227. if sp, ok := i.(SizedPutter); ok {
  228. err = sp.PutSized(r, r.Size())
  229. } else {
  230. err = i.Put(r)
  231. }
  232. n = len(b) - r.Len()
  233. return
  234. }
  235. type chunk struct {
  236. offset int64
  237. instance resource.Instance
  238. }
  239. type chunks []chunk
  240. func (me chunks) ReadAt(b []byte, off int64) (n int, err error) {
  241. i := sort.Search(len(me), func(i int) bool {
  242. return me[i].offset > off
  243. }) - 1
  244. if i == -1 {
  245. err = io.EOF
  246. return
  247. }
  248. chunk := me[i]
  249. // Go made me do this with it's bullshit named return values and := operator.
  250. again:
  251. n1, err := chunk.instance.ReadAt(b, off-chunk.offset)
  252. b = b[n1:]
  253. n += n1
  254. // Should we check here that we're not io.EOF or nil, per ReadAt's contract? That way we know we
  255. // don't have an error anymore for the rest of the block.
  256. if len(b) == 0 {
  257. // err = nil, so we don't send io.EOF on chunk boundaries?
  258. return
  259. }
  260. off += int64(n1)
  261. i++
  262. if i >= len(me) {
  263. if err == nil {
  264. err = io.EOF
  265. }
  266. return
  267. }
  268. chunk = me[i]
  269. if chunk.offset > off {
  270. if err == nil {
  271. err = io.ErrUnexpectedEOF
  272. }
  273. return
  274. }
  275. goto again
  276. }
  277. func (s piecePerResourcePiece) getChunks(dir string) (chunks chunks) {
  278. names, err := s.dirInstance(dir).Readdirnames()
  279. if err != nil {
  280. return
  281. }
  282. for _, n := range names {
  283. offset, err := strconv.ParseInt(n, 10, 64)
  284. if err != nil {
  285. panic(err)
  286. }
  287. i, err := s.rp.NewInstance(path.Join(dir, n))
  288. if err != nil {
  289. panic(err)
  290. }
  291. chunks = append(chunks, chunk{offset, i})
  292. }
  293. sort.Slice(chunks, func(i, j int) bool {
  294. return chunks[i].offset < chunks[j].offset
  295. })
  296. return
  297. }
  298. func (s piecePerResourcePiece) completedDirPath() string {
  299. if !s.hasMovePrefix() {
  300. panic("not move prefixing")
  301. }
  302. return path.Join("completed", s.hashHex())
  303. }
  304. func (s piecePerResourcePiece) completedInstancePath() string {
  305. if s.hasMovePrefix() {
  306. return s.completedDirPath() + "/0"
  307. }
  308. return path.Join("completed", s.hashHex())
  309. }
  310. func (s piecePerResourcePiece) completedInstance() resource.Instance {
  311. i, err := s.rp.NewInstance(s.completedInstancePath())
  312. if err != nil {
  313. panic(err)
  314. }
  315. return i
  316. }
  317. func (s piecePerResourcePiece) incompleteDirPath() string {
  318. return path.Join("incompleted", s.hashHex())
  319. }
  320. func (s piecePerResourcePiece) dirInstance(path string) resource.DirInstance {
  321. i, err := s.rp.NewInstance(path)
  322. if err != nil {
  323. panic(err)
  324. }
  325. return i.(resource.DirInstance)
  326. }
  327. func (me piecePerResourcePiece) hashHex() string {
  328. return hex.EncodeToString(me.pieceHash.Unwrap())
  329. }
  330. func (me piecePerResourcePiece) hasMovePrefix() bool {
  331. _, ok := me.rp.(MovePrefixer)
  332. return ok
  333. }