piece.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package torrent
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "github.com/anacrolix/chansync"
  7. g "github.com/anacrolix/generics"
  8. "github.com/anacrolix/missinggo/v2/bitmap"
  9. "github.com/anacrolix/torrent/merkle"
  10. "github.com/anacrolix/torrent/metainfo"
  11. pp "github.com/anacrolix/torrent/peer_protocol"
  12. "github.com/anacrolix/torrent/storage"
  13. )
  14. type Piece struct {
  15. // The completed piece SHA1 hash, from the metainfo "pieces" field. Nil if the info is not V1
  16. // compatible.
  17. hash *metainfo.Hash
  18. hashV2 g.Option[[32]byte]
  19. t *Torrent
  20. index pieceIndex
  21. files []*File
  22. readerCond chansync.BroadcastCond
  23. numVerifies int64
  24. hashing bool
  25. marking bool
  26. storageCompletionOk bool
  27. publicPieceState PieceState
  28. priority PiecePriority
  29. // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is
  30. // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and
  31. // the Peer isn't recorded in Torrent.connsWithAllPieces.
  32. relativeAvailability int
  33. // This can be locked when the Client lock is taken, but probably not vice versa.
  34. pendingWritesMutex sync.Mutex
  35. pendingWrites int
  36. noPendingWrites sync.Cond
  37. // Connections that have written data to this piece since its last check.
  38. // This can include connections that have closed.
  39. dirtiers map[*Peer]struct{}
  40. }
  41. func (p *Piece) String() string {
  42. return fmt.Sprintf("%s/%d", p.t.canonicalShortInfohash().HexString(), p.index)
  43. }
  44. func (p *Piece) Info() metainfo.Piece {
  45. return p.t.info.Piece(p.index)
  46. }
  47. func (p *Piece) Storage() storage.Piece {
  48. var pieceHash g.Option[[]byte]
  49. if p.hash != nil {
  50. pieceHash.Set(p.hash.Bytes())
  51. } else if !p.hasPieceLayer() {
  52. pieceHash.Set(p.mustGetOnlyFile().piecesRoot.UnwrapPtr()[:])
  53. } else if p.hashV2.Ok {
  54. pieceHash.Set(p.hashV2.Value[:])
  55. }
  56. return p.t.storage.PieceWithHash(p.Info(), pieceHash)
  57. }
  58. func (p *Piece) Flush() {
  59. if p.t.storage.Flush != nil {
  60. _ = p.t.storage.Flush()
  61. }
  62. }
  63. func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
  64. return !p.chunkIndexDirty(chunkIndex)
  65. }
  66. func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
  67. return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize))
  68. }
  69. func (p *Piece) hasDirtyChunks() bool {
  70. return p.numDirtyChunks() != 0
  71. }
  72. func (p *Piece) numDirtyChunks() chunkIndexType {
  73. return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex](
  74. &p.t.dirtyChunks,
  75. p.requestIndexOffset(),
  76. p.t.pieceRequestIndexOffset(p.index+1)))
  77. }
  78. func (p *Piece) unpendChunkIndex(i chunkIndexType) {
  79. p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
  80. p.t.updatePieceRequestOrderPiece(p.index)
  81. p.readerCond.Broadcast()
  82. }
  83. func (p *Piece) pendChunkIndex(i RequestIndex) {
  84. p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
  85. p.t.updatePieceRequestOrderPiece(p.index)
  86. }
  87. func (p *Piece) numChunks() chunkIndexType {
  88. return p.t.pieceNumChunks(p.index)
  89. }
  90. func (p *Piece) incrementPendingWrites() {
  91. p.pendingWritesMutex.Lock()
  92. p.pendingWrites++
  93. p.pendingWritesMutex.Unlock()
  94. }
  95. func (p *Piece) decrementPendingWrites() {
  96. p.pendingWritesMutex.Lock()
  97. if p.pendingWrites == 0 {
  98. panic("assertion")
  99. }
  100. p.pendingWrites--
  101. if p.pendingWrites == 0 {
  102. p.noPendingWrites.Broadcast()
  103. }
  104. p.pendingWritesMutex.Unlock()
  105. }
  106. func (p *Piece) waitNoPendingWrites() {
  107. p.pendingWritesMutex.Lock()
  108. for p.pendingWrites != 0 {
  109. p.noPendingWrites.Wait()
  110. }
  111. p.pendingWritesMutex.Unlock()
  112. }
  113. func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool {
  114. return p.t.dirtyChunks.Contains(p.requestIndexOffset() + chunk)
  115. }
  116. func (p *Piece) chunkIndexSpec(chunk chunkIndexType) ChunkSpec {
  117. return chunkIndexSpec(pp.Integer(chunk), p.length(), p.chunkSize())
  118. }
  119. func (p *Piece) numDirtyBytes() (ret pp.Integer) {
  120. // defer func() {
  121. // if ret > p.length() {
  122. // panic("too many dirty bytes")
  123. // }
  124. // }()
  125. numRegularDirtyChunks := p.numDirtyChunks()
  126. if p.chunkIndexDirty(p.numChunks() - 1) {
  127. numRegularDirtyChunks--
  128. ret += p.chunkIndexSpec(p.lastChunkIndex()).Length
  129. }
  130. ret += pp.Integer(numRegularDirtyChunks) * p.chunkSize()
  131. return
  132. }
  133. func (p *Piece) length() pp.Integer {
  134. return p.t.pieceLength(p.index)
  135. }
  136. func (p *Piece) chunkSize() pp.Integer {
  137. return p.t.chunkSize
  138. }
  139. func (p *Piece) lastChunkIndex() chunkIndexType {
  140. return p.numChunks() - 1
  141. }
  142. func (p *Piece) bytesLeft() (ret pp.Integer) {
  143. if p.t.pieceComplete(p.index) {
  144. return 0
  145. }
  146. return p.length() - p.numDirtyBytes()
  147. }
  148. // Forces the piece data to be rehashed.
  149. func (p *Piece) VerifyData() {
  150. p.t.cl.lock()
  151. defer p.t.cl.unlock()
  152. target := p.numVerifies + 1
  153. if p.hashing {
  154. target++
  155. }
  156. // log.Printf("target: %d", target)
  157. p.t.queuePieceCheck(p.index)
  158. for {
  159. // log.Printf("got %d verifies", p.numVerifies)
  160. if p.numVerifies >= target {
  161. break
  162. }
  163. p.t.cl.event.Wait()
  164. }
  165. // log.Print("done")
  166. }
  167. func (p *Piece) queuedForHash() bool {
  168. return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index))
  169. }
  170. func (p *Piece) torrentBeginOffset() int64 {
  171. return int64(p.index) * p.t.info.PieceLength
  172. }
  173. func (p *Piece) torrentEndOffset() int64 {
  174. return p.torrentBeginOffset() + int64(p.t.usualPieceSize())
  175. }
  176. func (p *Piece) SetPriority(prio PiecePriority) {
  177. p.t.cl.lock()
  178. defer p.t.cl.unlock()
  179. p.priority = prio
  180. p.t.updatePiecePriority(p.index, "Piece.SetPriority")
  181. }
  182. // This is priority based only on piece, file and reader priorities.
  183. func (p *Piece) purePriority() (ret PiecePriority) {
  184. for _, f := range p.files {
  185. ret.Raise(f.prio)
  186. }
  187. if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
  188. ret.Raise(PiecePriorityNow)
  189. }
  190. // if t._readerNowPieces.Contains(piece - 1) {
  191. // return PiecePriorityNext
  192. // }
  193. if p.t.readerReadaheadPieces().Contains(bitmap.BitIndex(p.index)) {
  194. ret.Raise(PiecePriorityReadahead)
  195. }
  196. ret.Raise(p.priority)
  197. return
  198. }
  199. func (p *Piece) ignoreForRequests() bool {
  200. return p.hashing || p.marking || !p.haveHash() || p.t.pieceComplete(p.index) || p.queuedForHash()
  201. }
  202. // This is the priority adjusted for piece state like completion, hashing etc.
  203. func (p *Piece) effectivePriority() (ret PiecePriority) {
  204. if p.ignoreForRequests() {
  205. return PiecePriorityNone
  206. }
  207. return p.purePriority()
  208. }
  209. // Tells the Client to refetch the completion status from storage, updating priority etc. if
  210. // necessary. Might be useful if you know the state of the piece data has changed externally.
  211. func (p *Piece) UpdateCompletion() {
  212. p.t.cl.lock()
  213. defer p.t.cl.unlock()
  214. p.t.updatePieceCompletion(p.index)
  215. }
  216. func (p *Piece) completion() (ret storage.Completion) {
  217. ret.Complete = p.t.pieceComplete(p.index)
  218. ret.Ok = p.storageCompletionOk
  219. return
  220. }
  221. func (p *Piece) allChunksDirty() bool {
  222. return p.numDirtyChunks() == p.numChunks()
  223. }
  224. func (p *Piece) State() PieceState {
  225. return p.t.PieceState(p.index)
  226. }
  227. func (p *Piece) requestIndexOffset() RequestIndex {
  228. return p.t.pieceRequestIndexOffset(p.index)
  229. }
  230. func (p *Piece) availability() int {
  231. return len(p.t.connsWithAllPieces) + p.relativeAvailability
  232. }
  233. // For v2 torrents, files are aligned to pieces so there should always only be a single file for a
  234. // given piece.
  235. func (p *Piece) mustGetOnlyFile() *File {
  236. if len(p.files) != 1 {
  237. panic(len(p.files))
  238. }
  239. return p.files[0]
  240. }
  241. // Sets the v2 piece hash, queuing initial piece checks if appropriate.
  242. func (p *Piece) setV2Hash(v2h [32]byte) {
  243. // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we didn't yet
  244. // have a piece hash (can occur with v2 when we don't start with piece layers).
  245. p.t.storageLock.Lock()
  246. oldV2Hash := p.hashV2.Set(v2h)
  247. p.t.storageLock.Unlock()
  248. if !oldV2Hash.Ok && p.hash == nil {
  249. p.t.updatePieceCompletion(p.index)
  250. p.t.queueInitialPieceCheck(p.index)
  251. }
  252. }
  253. // Can't do certain things if we don't know the piece hash.
  254. func (p *Piece) haveHash() bool {
  255. if p.hash != nil {
  256. return true
  257. }
  258. if !p.hasPieceLayer() {
  259. return true
  260. }
  261. return p.hashV2.Ok
  262. }
  263. func (p *Piece) hasPieceLayer() bool {
  264. return len(p.files) == 1 && p.files[0].length > p.t.info.PieceLength
  265. }
  266. func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
  267. if p.hashV2.Ok {
  268. hash = p.hashV2.Value
  269. return
  270. }
  271. if !p.hasPieceLayer() {
  272. hash = p.mustGetOnlyFile().piecesRoot.Unwrap()
  273. return
  274. }
  275. storage := p.Storage()
  276. if !storage.Completion().Complete {
  277. err = errors.New("piece incomplete")
  278. return
  279. }
  280. h := merkle.NewHash()
  281. if _, err = storage.WriteTo(h); err != nil {
  282. return
  283. }
  284. h.SumMinLength(hash[:0], int(p.t.info.PieceLength))
  285. return
  286. }