torrent.go 85 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267
  1. package torrent
  2. import (
  3. "bytes"
  4. "container/heap"
  5. "context"
  6. "crypto/sha1"
  7. "errors"
  8. "fmt"
  9. "hash"
  10. "io"
  11. "math/rand"
  12. "net/netip"
  13. "net/url"
  14. "sort"
  15. "strings"
  16. "text/tabwriter"
  17. "time"
  18. "unsafe"
  19. "github.com/RoaringBitmap/roaring"
  20. "github.com/anacrolix/chansync"
  21. "github.com/anacrolix/chansync/events"
  22. "github.com/anacrolix/dht/v2"
  23. . "github.com/anacrolix/generics"
  24. g "github.com/anacrolix/generics"
  25. "github.com/anacrolix/log"
  26. "github.com/anacrolix/missinggo/slices"
  27. "github.com/anacrolix/missinggo/v2"
  28. "github.com/anacrolix/missinggo/v2/bitmap"
  29. "github.com/anacrolix/missinggo/v2/pubsub"
  30. "github.com/anacrolix/multiless"
  31. "github.com/anacrolix/sync"
  32. "github.com/pion/datachannel"
  33. "golang.org/x/exp/maps"
  34. "golang.org/x/sync/errgroup"
  35. "github.com/anacrolix/torrent/bencode"
  36. "github.com/anacrolix/torrent/internal/check"
  37. "github.com/anacrolix/torrent/internal/nestedmaps"
  38. "github.com/anacrolix/torrent/merkle"
  39. "github.com/anacrolix/torrent/metainfo"
  40. pp "github.com/anacrolix/torrent/peer_protocol"
  41. utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
  42. request_strategy "github.com/anacrolix/torrent/request-strategy"
  43. "github.com/anacrolix/torrent/storage"
  44. "github.com/anacrolix/torrent/tracker"
  45. typedRoaring "github.com/anacrolix/torrent/typed-roaring"
  46. "github.com/anacrolix/torrent/types/infohash"
  47. infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
  48. "github.com/anacrolix/torrent/webseed"
  49. "github.com/anacrolix/torrent/webtorrent"
  50. )
  51. // Maintains state of torrent within a Client. Many methods should not be called before the info is
  52. // available, see .Info and .GotInfo.
  53. type Torrent struct {
  54. // Torrent-level aggregate statistics. First in struct to ensure 64-bit
  55. // alignment. See #262.
  56. stats ConnStats
  57. cl *Client
  58. logger log.Logger
  59. networkingEnabled chansync.Flag
  60. dataDownloadDisallowed chansync.Flag
  61. dataUploadDisallowed bool
  62. userOnWriteChunkErr func(error)
  63. closed chansync.SetOnce
  64. onClose []func()
  65. infoHash g.Option[metainfo.Hash]
  66. infoHashV2 g.Option[infohash_v2.T]
  67. pieces []Piece
  68. // The order pieces are requested if there's no stronger reason like availability or priority.
  69. pieceRequestOrder []int
  70. // Values are the piece indices that changed.
  71. pieceStateChanges pubsub.PubSub[PieceStateChange]
  72. // The size of chunks to request from peers over the wire. This is
  73. // normally 16KiB by convention these days.
  74. chunkSize pp.Integer
  75. chunkPool sync.Pool
  76. // Total length of the torrent in bytes. Stored because it's not O(1) to
  77. // get this from the info dict.
  78. _length Option[int64]
  79. // The storage to open when the info dict becomes available.
  80. storageOpener *storage.Client
  81. // Storage for torrent data.
  82. storage *storage.Torrent
  83. // Read-locked for using storage, and write-locked for Closing.
  84. storageLock sync.RWMutex
  85. // TODO: Only announce stuff is used?
  86. metainfo metainfo.MetaInfo
  87. // The info dict. nil if we don't have it (yet).
  88. info *metainfo.Info
  89. files *[]*File
  90. _chunksPerRegularPiece chunkIndexType
  91. webSeeds map[string]*Peer
  92. // Active peer connections, running message stream loops. TODO: Make this
  93. // open (not-closed) connections only.
  94. conns map[*PeerConn]struct{}
  95. maxEstablishedConns int
  96. // Set of addrs to which we're attempting to connect. Connections are
  97. // half-open until all handshakes are completed.
  98. halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo
  99. // Reserve of peers to connect to. A peer can be both here and in the
  100. // active connections if were told about the peer after connecting with
  101. // them. That encourages us to reconnect to peers that are well known in
  102. // the swarm.
  103. peers prioritizedPeers
  104. // Whether we want to know more peers.
  105. wantPeersEvent missinggo.Event
  106. // An announcer for each tracker URL.
  107. trackerAnnouncers map[torrentTrackerAnnouncerKey]torrentTrackerAnnouncer
  108. // How many times we've initiated a DHT announce. TODO: Move into stats.
  109. numDHTAnnounces int
  110. // Name used if the info name isn't available. Should be cleared when the
  111. // Info does become available.
  112. nameMu sync.RWMutex
  113. displayName string
  114. // The bencoded bytes of the info dict. This is actively manipulated if
  115. // the info bytes aren't initially available, and we try to fetch them
  116. // from peers.
  117. metadataBytes []byte
  118. // Each element corresponds to the 16KiB metadata pieces. If true, we have
  119. // received that piece.
  120. metadataCompletedChunks []bool
  121. metadataChanged sync.Cond
  122. // Closed when .Info is obtained.
  123. gotMetainfoC chan struct{}
  124. readers map[*reader]struct{}
  125. _readerNowPieces bitmap.Bitmap
  126. _readerReadaheadPieces bitmap.Bitmap
  127. // A cache of pieces we need to get. Calculated from various piece and file priorities and
  128. // completion states elsewhere. Includes piece data and piece v2 hashes.
  129. _pendingPieces roaring.Bitmap
  130. // A cache of completed piece indices.
  131. _completedPieces roaring.Bitmap
  132. // Pieces that need to be hashed.
  133. piecesQueuedForHash bitmap.Bitmap
  134. activePieceHashes int
  135. initialPieceCheckDisabled bool
  136. connsWithAllPieces map[*Peer]struct{}
  137. requestState map[RequestIndex]requestState
  138. // Chunks we've written to since the corresponding piece was last checked.
  139. dirtyChunks typedRoaring.Bitmap[RequestIndex]
  140. pex pexState
  141. // Is On when all pieces are complete.
  142. complete chansync.Flag
  143. // Torrent sources in use keyed by the source string.
  144. activeSources sync.Map
  145. sourcesLogger log.Logger
  146. smartBanCache smartBanCache
  147. // Large allocations reused between request state updates.
  148. requestPieceStates []g.Option[request_strategy.PieceRequestOrderState]
  149. requestIndexes []RequestIndex
  150. disableTriggers bool
  151. }
  152. type torrentTrackerAnnouncerKey struct {
  153. shortInfohash [20]byte
  154. url string
  155. }
  156. type outgoingConnAttemptKey = *PeerInfo
  157. func (t *Torrent) length() int64 {
  158. return t._length.Value
  159. }
  160. func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
  161. // This could be done with roaring.BitSliceIndexing.
  162. t.iterPeers(func(peer *Peer) {
  163. if _, ok := t.connsWithAllPieces[peer]; ok {
  164. return
  165. }
  166. if peer.peerHasPiece(i) {
  167. count++
  168. }
  169. })
  170. return
  171. }
  172. func (t *Torrent) decPieceAvailability(i pieceIndex) {
  173. if !t.haveInfo() {
  174. return
  175. }
  176. p := t.piece(i)
  177. if p.relativeAvailability <= 0 {
  178. panic(p.relativeAvailability)
  179. }
  180. p.relativeAvailability--
  181. t.updatePieceRequestOrderPiece(i)
  182. }
  183. func (t *Torrent) incPieceAvailability(i pieceIndex) {
  184. // If we don't the info, this should be reconciled when we do.
  185. if t.haveInfo() {
  186. p := t.piece(i)
  187. p.relativeAvailability++
  188. t.updatePieceRequestOrderPiece(i)
  189. }
  190. }
  191. func (t *Torrent) readerNowPieces() bitmap.Bitmap {
  192. return t._readerNowPieces
  193. }
  194. func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
  195. return t._readerReadaheadPieces
  196. }
  197. func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
  198. return t.piece(i).ignoreForRequests()
  199. }
  200. // Returns a channel that is closed when the Torrent is closed.
  201. func (t *Torrent) Closed() events.Done {
  202. return t.closed.Done()
  203. }
  204. // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
  205. // pending, and half-open peers.
  206. func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
  207. // Add pending peers to the list
  208. t.peers.Each(func(peer PeerInfo) {
  209. ks = append(ks, peer)
  210. })
  211. // Add half-open peers to the list
  212. for _, attempts := range t.halfOpen {
  213. for _, peer := range attempts {
  214. ks = append(ks, *peer)
  215. }
  216. }
  217. // Add active peers to the list
  218. t.cl.rLock()
  219. defer t.cl.rUnlock()
  220. for conn := range t.conns {
  221. ks = append(ks, PeerInfo{
  222. Id: conn.PeerID,
  223. Addr: conn.RemoteAddr,
  224. Source: conn.Discovery,
  225. // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
  226. // > But if we're not connected to them with an encrypted connection, I couldn't say
  227. // > what's appropriate. We can carry forward the SupportsEncryption value as we
  228. // > received it from trackers/DHT/PEX, or just use the encryption state for the
  229. // > connection. It's probably easiest to do the latter for now.
  230. // https://github.com/anacrolix/torrent/pull/188
  231. SupportsEncryption: conn.headerEncrypted,
  232. })
  233. }
  234. return
  235. }
  236. func (t *Torrent) setChunkSize(size pp.Integer) {
  237. t.chunkSize = size
  238. t.chunkPool = sync.Pool{
  239. New: func() interface{} {
  240. b := make([]byte, size)
  241. return &b
  242. },
  243. }
  244. }
  245. func (t *Torrent) pieceComplete(piece pieceIndex) bool {
  246. return t._completedPieces.Contains(bitmap.BitIndex(piece))
  247. }
  248. func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
  249. if t.storage == nil {
  250. return storage.Completion{Complete: false, Ok: true}
  251. }
  252. return t.pieces[piece].Storage().Completion()
  253. }
  254. // There's a connection to that address already.
  255. func (t *Torrent) addrActive(addr string) bool {
  256. if _, ok := t.halfOpen[addr]; ok {
  257. return true
  258. }
  259. for c := range t.conns {
  260. ra := c.RemoteAddr
  261. if ra.String() == addr {
  262. return true
  263. }
  264. }
  265. return false
  266. }
  267. func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
  268. return t.appendConns(ret, func(conn *PeerConn) bool {
  269. return !conn.closed.IsSet()
  270. })
  271. }
  272. func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
  273. for c := range t.conns {
  274. if f(c) {
  275. ret = append(ret, c)
  276. }
  277. }
  278. return ret
  279. }
  280. func (t *Torrent) addPeer(p PeerInfo) (added bool) {
  281. cl := t.cl
  282. torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
  283. if t.closed.IsSet() {
  284. return false
  285. }
  286. if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
  287. if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
  288. torrent.Add("peers not added because of bad addr", 1)
  289. // cl.logger.Printf("peers not added because of bad addr: %v", p)
  290. return false
  291. }
  292. }
  293. if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
  294. torrent.Add("peers replaced", 1)
  295. if !replaced.equal(p) {
  296. t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
  297. added = true
  298. }
  299. } else {
  300. added = true
  301. }
  302. t.openNewConns()
  303. for t.peers.Len() > cl.config.TorrentPeersHighWater {
  304. _, ok := t.peers.DeleteMin()
  305. if ok {
  306. torrent.Add("excess reserve peers discarded", 1)
  307. }
  308. }
  309. return
  310. }
  311. func (t *Torrent) invalidateMetadata() {
  312. for i := 0; i < len(t.metadataCompletedChunks); i++ {
  313. t.metadataCompletedChunks[i] = false
  314. }
  315. t.nameMu.Lock()
  316. t.info = nil
  317. t.nameMu.Unlock()
  318. }
  319. func (t *Torrent) saveMetadataPiece(index int, data []byte) {
  320. if t.haveInfo() {
  321. return
  322. }
  323. if index >= len(t.metadataCompletedChunks) {
  324. t.logger.Printf("%s: ignoring metadata piece %d", t, index)
  325. return
  326. }
  327. copy(t.metadataBytes[(1<<14)*index:], data)
  328. t.metadataCompletedChunks[index] = true
  329. }
  330. func (t *Torrent) metadataPieceCount() int {
  331. return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
  332. }
  333. func (t *Torrent) haveMetadataPiece(piece int) bool {
  334. if t.haveInfo() {
  335. return (1<<14)*piece < len(t.metadataBytes)
  336. } else {
  337. return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
  338. }
  339. }
  340. func (t *Torrent) metadataSize() int {
  341. return len(t.metadataBytes)
  342. }
  343. func (t *Torrent) makePieces() {
  344. t.pieces = make([]Piece, t.info.NumPieces())
  345. for i := range t.pieces {
  346. piece := &t.pieces[i]
  347. piece.t = t
  348. piece.index = i
  349. piece.noPendingWrites.L = &piece.pendingWritesMutex
  350. if t.info.HasV1() {
  351. piece.hash = (*metainfo.Hash)(unsafe.Pointer(
  352. unsafe.SliceData(t.info.Pieces[i*sha1.Size : (i+1)*sha1.Size])))
  353. }
  354. files := *t.files
  355. beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
  356. endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
  357. piece.files = files[beginFile:endFile]
  358. }
  359. }
  360. func (t *Torrent) addPieceLayersLocked(layers map[string]string) (errs []error) {
  361. if layers == nil {
  362. return
  363. }
  364. files:
  365. for _, f := range *t.files {
  366. if f.numPieces() <= 1 {
  367. continue
  368. }
  369. if !f.piecesRoot.Ok {
  370. err := fmt.Errorf("no piece root set for file %v", f)
  371. errs = append(errs, err)
  372. continue files
  373. }
  374. compactLayer, ok := layers[string(f.piecesRoot.Value[:])]
  375. var hashes [][32]byte
  376. if ok {
  377. var err error
  378. hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
  379. if err != nil {
  380. err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
  381. errs = append(errs, err)
  382. continue files
  383. }
  384. } else {
  385. if f.length > t.info.PieceLength {
  386. // BEP 52 is pretty strongly worded about this, even though we should be able to
  387. // recover: If a v2 torrent is added by magnet link or infohash, we need to fetch
  388. // piece layers ourselves anyway, and that's how we can recover from this.
  389. t.logger.Levelf(log.Warning, "no piece layers for file %q", f)
  390. }
  391. continue files
  392. }
  393. if len(hashes) != f.numPieces() {
  394. errs = append(
  395. errs,
  396. fmt.Errorf("file %q: got %v hashes expected %v", f, len(hashes), f.numPieces()),
  397. )
  398. continue files
  399. }
  400. root := merkle.RootWithPadHash(hashes, metainfo.HashForPiecePad(t.info.PieceLength))
  401. if root != f.piecesRoot.Value {
  402. errs = append(errs, fmt.Errorf("%v: expected hash %x got %x", f, f.piecesRoot.Value, root))
  403. continue files
  404. }
  405. for i := range f.numPieces() {
  406. pi := f.BeginPieceIndex() + i
  407. p := t.piece(pi)
  408. p.setV2Hash(hashes[i])
  409. }
  410. }
  411. return
  412. }
  413. func (t *Torrent) AddPieceLayers(layers map[string]string) (errs []error) {
  414. t.cl.lock()
  415. defer t.cl.unlock()
  416. return t.addPieceLayersLocked(layers)
  417. }
  418. // Returns the index of the first file containing the piece. files must be
  419. // ordered by offset.
  420. func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
  421. for i, f := range files {
  422. if f.offset+f.length > pieceOffset {
  423. return i
  424. }
  425. }
  426. return 0
  427. }
  428. // Returns the index after the last file containing the piece. files must be
  429. // ordered by offset.
  430. func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
  431. for i, f := range files {
  432. if f.offset >= pieceEndOffset {
  433. return i
  434. }
  435. }
  436. return len(files)
  437. }
  438. func (t *Torrent) cacheLength() {
  439. var l int64
  440. for _, f := range t.info.UpvertedFiles() {
  441. l += f.Length
  442. }
  443. t._length = Some(l)
  444. }
  445. // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
  446. // separately.
  447. func (t *Torrent) setInfo(info *metainfo.Info) error {
  448. if err := validateInfo(info); err != nil {
  449. return fmt.Errorf("bad info: %s", err)
  450. }
  451. if t.storageOpener != nil {
  452. var err error
  453. ctx := log.ContextWithLogger(context.Background(), t.logger)
  454. t.storage, err = t.storageOpener.OpenTorrent(ctx, info, *t.canonicalShortInfohash())
  455. if err != nil {
  456. return fmt.Errorf("error opening torrent storage: %s", err)
  457. }
  458. }
  459. t.nameMu.Lock()
  460. t.info = info
  461. t.nameMu.Unlock()
  462. t._chunksPerRegularPiece = chunkIndexType(
  463. (pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
  464. t.updateComplete()
  465. t.displayName = "" // Save a few bytes lol.
  466. t.initFiles()
  467. t.cacheLength()
  468. t.makePieces()
  469. return nil
  470. }
  471. func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
  472. return request_strategy.PieceRequestOrderKey{
  473. InfoHash: *t.canonicalShortInfohash(),
  474. Index: i,
  475. }
  476. }
  477. // This seems to be all the follow-up tasks after info is set, that can't fail.
  478. func (t *Torrent) onSetInfo() {
  479. t.pieceRequestOrder = rand.Perm(t.numPieces())
  480. t.initPieceRequestOrder()
  481. MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
  482. for i := range t.pieces {
  483. p := &t.pieces[i]
  484. // Need to add relativeAvailability before updating piece completion, as that may result in conns
  485. // being dropped.
  486. if p.relativeAvailability != 0 {
  487. panic(p.relativeAvailability)
  488. }
  489. p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
  490. t.addRequestOrderPiece(i)
  491. t.updatePieceCompletion(i)
  492. t.queueInitialPieceCheck(i)
  493. }
  494. t.cl.event.Broadcast()
  495. close(t.gotMetainfoC)
  496. t.updateWantPeersEvent()
  497. t.requestState = make(map[RequestIndex]requestState)
  498. t.tryCreateMorePieceHashers()
  499. t.iterPeers(func(p *Peer) {
  500. p.onGotInfo(t.info)
  501. p.updateRequests("onSetInfo")
  502. })
  503. }
  504. // Checks the info bytes hash to expected values. Fills in any missing infohashes.
  505. func (t *Torrent) hashInfoBytes(b []byte, info *metainfo.Info) error {
  506. v1Hash := infohash.HashBytes(b)
  507. v2Hash := infohash_v2.HashBytes(b)
  508. cl := t.cl
  509. if t.infoHash.Ok && !t.infoHashV2.Ok {
  510. if v1Hash == t.infoHash.Value {
  511. if info.HasV2() {
  512. t.infoHashV2.Set(v2Hash)
  513. cl.torrentsByShortHash[*v2Hash.ToShort()] = t
  514. }
  515. } else if *v2Hash.ToShort() == t.infoHash.Value {
  516. if !info.HasV2() {
  517. return errors.New("invalid v2 info")
  518. }
  519. t.infoHashV2.Set(v2Hash)
  520. t.infoHash.SetNone()
  521. if info.HasV1() {
  522. cl.torrentsByShortHash[v1Hash] = t
  523. t.infoHash.Set(v1Hash)
  524. }
  525. }
  526. } else if t.infoHash.Ok && t.infoHashV2.Ok {
  527. if v1Hash != t.infoHash.Value {
  528. return errors.New("incorrect v1 infohash")
  529. }
  530. if v2Hash != t.infoHashV2.Value {
  531. return errors.New("incorrect v2 infohash")
  532. }
  533. } else if !t.infoHash.Ok && t.infoHashV2.Ok {
  534. if v2Hash != t.infoHashV2.Value {
  535. return errors.New("incorrect v2 infohash")
  536. }
  537. if info.HasV1() {
  538. t.infoHash.Set(v1Hash)
  539. cl.torrentsByShortHash[v1Hash] = t
  540. }
  541. } else {
  542. panic("no expected infohashes")
  543. }
  544. return nil
  545. }
  546. // Called when metadata for a torrent becomes available.
  547. func (t *Torrent) setInfoBytesLocked(b []byte) (err error) {
  548. var info metainfo.Info
  549. err = bencode.Unmarshal(b, &info)
  550. if err != nil {
  551. err = fmt.Errorf("unmarshalling info bytes: %w", err)
  552. return
  553. }
  554. err = t.hashInfoBytes(b, &info)
  555. if err != nil {
  556. return
  557. }
  558. t.metadataBytes = b
  559. t.metadataCompletedChunks = nil
  560. if t.info != nil {
  561. return nil
  562. }
  563. if err := t.setInfo(&info); err != nil {
  564. return err
  565. }
  566. t.onSetInfo()
  567. return nil
  568. }
  569. func (t *Torrent) haveAllMetadataPieces() bool {
  570. if t.haveInfo() {
  571. return true
  572. }
  573. if t.metadataCompletedChunks == nil {
  574. return false
  575. }
  576. for _, have := range t.metadataCompletedChunks {
  577. if !have {
  578. return false
  579. }
  580. }
  581. return true
  582. }
  583. // TODO: Propagate errors to disconnect peer.
  584. func (t *Torrent) setMetadataSize(size int) (err error) {
  585. if t.haveInfo() {
  586. // We already know the correct metadata size.
  587. return
  588. }
  589. if uint32(size) > maxMetadataSize {
  590. return log.WithLevel(log.Warning, errors.New("bad size"))
  591. }
  592. if len(t.metadataBytes) == size {
  593. return
  594. }
  595. t.metadataBytes = make([]byte, size)
  596. t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
  597. t.metadataChanged.Broadcast()
  598. for c := range t.conns {
  599. c.requestPendingMetadata()
  600. }
  601. return
  602. }
  603. // The current working name for the torrent. Either the name in the info dict,
  604. // or a display name given such as by the dn value in a magnet link, or "".
  605. func (t *Torrent) name() string {
  606. t.nameMu.RLock()
  607. defer t.nameMu.RUnlock()
  608. if t.haveInfo() {
  609. return t.info.BestName()
  610. }
  611. if t.displayName != "" {
  612. return t.displayName
  613. }
  614. return "infohash:" + t.canonicalShortInfohash().HexString()
  615. }
  616. func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
  617. p := &t.pieces[index]
  618. ret.Priority = p.effectivePriority()
  619. ret.Completion = p.completion()
  620. ret.QueuedForHash = p.queuedForHash()
  621. ret.Hashing = p.hashing
  622. ret.Checking = ret.QueuedForHash || ret.Hashing
  623. ret.Marking = p.marking
  624. if !ret.Complete && t.piecePartiallyDownloaded(index) {
  625. ret.Partial = true
  626. }
  627. if t.info.HasV2() && !p.hashV2.Ok && p.hasPieceLayer() {
  628. ret.MissingPieceLayerHash = true
  629. }
  630. return
  631. }
  632. func (t *Torrent) metadataPieceSize(piece int) int {
  633. return metadataPieceSize(len(t.metadataBytes), piece)
  634. }
  635. func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
  636. return pp.Message{
  637. Type: pp.Extended,
  638. ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
  639. ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
  640. Piece: piece,
  641. TotalSize: len(t.metadataBytes),
  642. Type: msgType,
  643. }), data...),
  644. }
  645. }
  646. type pieceAvailabilityRun struct {
  647. Count pieceIndex
  648. Availability int
  649. }
  650. func (me pieceAvailabilityRun) String() string {
  651. return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
  652. }
  653. func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
  654. rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
  655. ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
  656. })
  657. for i := range t.pieces {
  658. rle.Append(t.pieces[i].availability(), 1)
  659. }
  660. rle.Flush()
  661. return
  662. }
  663. func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
  664. freqs = make([]int, t.numActivePeers()+1)
  665. for i := range t.pieces {
  666. freqs[t.piece(i).availability()]++
  667. }
  668. return
  669. }
  670. func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
  671. rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
  672. ret = append(ret, PieceStateRun{
  673. PieceState: el.(PieceState),
  674. Length: int(count),
  675. })
  676. })
  677. for index := range t.pieces {
  678. rle.Append(t.pieceState(index), 1)
  679. }
  680. rle.Flush()
  681. return
  682. }
  683. // Produces a small string representing a PieceStateRun.
  684. func (psr PieceStateRun) String() (ret string) {
  685. ret = fmt.Sprintf("%d", psr.Length)
  686. ret += func() string {
  687. switch psr.Priority {
  688. case PiecePriorityNext:
  689. return "N"
  690. case PiecePriorityNormal:
  691. return "."
  692. case PiecePriorityReadahead:
  693. return "R"
  694. case PiecePriorityNow:
  695. return "!"
  696. case PiecePriorityHigh:
  697. return "H"
  698. default:
  699. return ""
  700. }
  701. }()
  702. if psr.Hashing {
  703. ret += "H"
  704. }
  705. if psr.QueuedForHash {
  706. ret += "Q"
  707. }
  708. if psr.Marking {
  709. ret += "M"
  710. }
  711. if psr.Partial {
  712. ret += "P"
  713. }
  714. if psr.Complete {
  715. ret += "C"
  716. }
  717. if !psr.Ok {
  718. ret += "?"
  719. }
  720. if psr.MissingPieceLayerHash {
  721. ret += "h"
  722. }
  723. return
  724. }
  725. func (t *Torrent) writeStatus(w io.Writer) {
  726. if t.infoHash.Ok {
  727. fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.Value.HexString())
  728. }
  729. if t.infoHashV2.Ok {
  730. fmt.Fprintf(w, "Infohash v2: %s\n", t.infoHashV2.Value.HexString())
  731. }
  732. fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
  733. if !t.haveInfo() {
  734. fmt.Fprintf(w, "Metadata have: ")
  735. for _, h := range t.metadataCompletedChunks {
  736. fmt.Fprintf(w, "%c", func() rune {
  737. if h {
  738. return 'H'
  739. } else {
  740. return '.'
  741. }
  742. }())
  743. }
  744. fmt.Fprintln(w)
  745. }
  746. fmt.Fprintf(w, "Piece length: %s\n",
  747. func() string {
  748. if t.haveInfo() {
  749. return fmt.Sprintf("%v (%v chunks)",
  750. t.usualPieceSize(),
  751. float64(t.usualPieceSize())/float64(t.chunkSize))
  752. } else {
  753. return "no info"
  754. }
  755. }(),
  756. )
  757. if t.info != nil {
  758. fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
  759. fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
  760. // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
  761. // availability frequencies instead.
  762. if false {
  763. fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
  764. for _, run := range t.pieceAvailabilityRuns() {
  765. ret = append(ret, run.String())
  766. }
  767. return
  768. }(), " "))
  769. }
  770. fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
  771. func() (ret []string) {
  772. for avail, freq := range t.pieceAvailabilityFrequencies() {
  773. if freq == 0 {
  774. continue
  775. }
  776. ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
  777. }
  778. return
  779. }(),
  780. ", "))
  781. }
  782. fmt.Fprintf(w, "Reader Pieces:")
  783. t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
  784. fmt.Fprintf(w, " %d:%d", begin, end)
  785. return true
  786. })
  787. fmt.Fprintln(w)
  788. fmt.Fprintf(w, "Enabled trackers:\n")
  789. func() {
  790. tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
  791. fmt.Fprintf(tw, " URL\tExtra\n")
  792. for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
  793. lu := l.URL()
  794. ru := r.URL()
  795. var luns, runs url.URL = *lu, *ru
  796. luns.Scheme = ""
  797. runs.Scheme = ""
  798. var ml missinggo.MultiLess
  799. ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
  800. ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
  801. return ml.Less()
  802. }).([]torrentTrackerAnnouncer) {
  803. fmt.Fprintf(tw, " %q\t%v\n", ta.URL(), ta.statusLine())
  804. }
  805. tw.Flush()
  806. }()
  807. fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
  808. dumpStats(w, t.statsLocked())
  809. fmt.Fprintf(w, "webseeds:\n")
  810. t.writePeerStatuses(w, maps.Values(t.webSeeds))
  811. peerConns := maps.Keys(t.conns)
  812. // Peers without priorities first, then those with. I'm undecided about how to order peers
  813. // without priorities.
  814. sort.Slice(peerConns, func(li, ri int) bool {
  815. l := peerConns[li]
  816. r := peerConns[ri]
  817. ml := multiless.New()
  818. lpp := g.ResultFromTuple(l.peerPriority()).ToOption()
  819. rpp := g.ResultFromTuple(r.peerPriority()).ToOption()
  820. ml = ml.Bool(lpp.Ok, rpp.Ok)
  821. ml = ml.Uint32(rpp.Value, lpp.Value)
  822. return ml.Less()
  823. })
  824. fmt.Fprintf(w, "%v peer conns:\n", len(peerConns))
  825. t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer {
  826. return &pc.Peer
  827. }))
  828. }
  829. func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) {
  830. var buf bytes.Buffer
  831. for _, c := range peers {
  832. fmt.Fprintf(w, "- ")
  833. buf.Reset()
  834. c.writeStatus(&buf)
  835. w.Write(bytes.TrimRight(
  836. bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
  837. " "))
  838. }
  839. }
  840. func (t *Torrent) haveInfo() bool {
  841. return t.info != nil
  842. }
  843. // Returns a run-time generated MetaInfo that includes the info bytes and
  844. // announce-list as currently known to the client.
  845. func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
  846. return metainfo.MetaInfo{
  847. CreationDate: time.Now().Unix(),
  848. Comment: "dynamic metainfo from client",
  849. CreatedBy: "https://github.com/anacrolix/torrent",
  850. AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
  851. InfoBytes: func() []byte {
  852. if t.haveInfo() {
  853. return t.metadataBytes
  854. } else {
  855. return nil
  856. }
  857. }(),
  858. UrlList: func() []string {
  859. ret := make([]string, 0, len(t.webSeeds))
  860. for url := range t.webSeeds {
  861. ret = append(ret, url)
  862. }
  863. return ret
  864. }(),
  865. PieceLayers: t.pieceLayers(),
  866. }
  867. }
  868. // Returns a count of bytes that are not complete in storage, and not pending being written to
  869. // storage. This value is from the perspective of the download manager, and may not agree with the
  870. // actual state in storage. If you want read data synchronously you should use a Reader. See
  871. // https://github.com/anacrolix/torrent/issues/828.
  872. func (t *Torrent) BytesMissing() (n int64) {
  873. t.cl.rLock()
  874. n = t.bytesMissingLocked()
  875. t.cl.rUnlock()
  876. return
  877. }
  878. func (t *Torrent) bytesMissingLocked() int64 {
  879. return t.bytesLeft()
  880. }
  881. func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
  882. roaring.Flip(b, 0, end).Iterate(cb)
  883. }
  884. func (t *Torrent) bytesLeft() (left int64) {
  885. iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
  886. p := t.piece(pieceIndex(x))
  887. left += int64(p.length() - p.numDirtyBytes())
  888. return true
  889. })
  890. return
  891. }
  892. // Bytes left to give in tracker announces.
  893. func (t *Torrent) bytesLeftAnnounce() int64 {
  894. if t.haveInfo() {
  895. return t.bytesLeft()
  896. } else {
  897. return -1
  898. }
  899. }
  900. func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
  901. if t.pieceComplete(piece) {
  902. return false
  903. }
  904. if t.pieceAllDirty(piece) {
  905. return false
  906. }
  907. return t.pieces[piece].hasDirtyChunks()
  908. }
  909. func (t *Torrent) usualPieceSize() int {
  910. return int(t.info.PieceLength)
  911. }
  912. func (t *Torrent) numPieces() pieceIndex {
  913. return t.info.NumPieces()
  914. }
  915. func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
  916. return pieceIndex(t._completedPieces.GetCardinality())
  917. }
  918. func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
  919. if !t.closed.Set() {
  920. err = errors.New("already closed")
  921. return
  922. }
  923. for _, f := range t.onClose {
  924. f()
  925. }
  926. if t.storage != nil {
  927. wg.Add(1)
  928. go func() {
  929. defer wg.Done()
  930. t.storageLock.Lock()
  931. defer t.storageLock.Unlock()
  932. if f := t.storage.Close; f != nil {
  933. err1 := f()
  934. if err1 != nil {
  935. t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
  936. }
  937. }
  938. }()
  939. }
  940. t.iterPeers(func(p *Peer) {
  941. p.close()
  942. })
  943. if t.storage != nil {
  944. t.deletePieceRequestOrder()
  945. }
  946. t.assertAllPiecesRelativeAvailabilityZero()
  947. t.pex.Reset()
  948. t.cl.event.Broadcast()
  949. t.pieceStateChanges.Close()
  950. t.updateWantPeersEvent()
  951. return
  952. }
  953. func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
  954. for i := range t.pieces {
  955. p := t.piece(i)
  956. if p.relativeAvailability != 0 {
  957. panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
  958. }
  959. }
  960. }
  961. func (t *Torrent) requestOffset(r Request) int64 {
  962. return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
  963. }
  964. // Return the request that would include the given offset into the torrent data. Returns !ok if
  965. // there is no such request.
  966. func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
  967. return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
  968. }
  969. func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
  970. //defer perf.ScopeTimerErr(&err)()
  971. n, err := t.pieces[piece].Storage().WriteAt(data, begin)
  972. if err == nil && n != len(data) {
  973. err = io.ErrShortWrite
  974. }
  975. return err
  976. }
  977. func (t *Torrent) bitfield() (bf []bool) {
  978. bf = make([]bool, t.numPieces())
  979. t._completedPieces.Iterate(func(piece uint32) (again bool) {
  980. bf[piece] = true
  981. return true
  982. })
  983. return
  984. }
  985. func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
  986. return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
  987. }
  988. func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
  989. return t._chunksPerRegularPiece
  990. }
  991. func (t *Torrent) numChunks() RequestIndex {
  992. if t.numPieces() == 0 {
  993. return 0
  994. }
  995. return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
  996. }
  997. func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
  998. t.dirtyChunks.RemoveRange(
  999. uint64(t.pieceRequestIndexOffset(pieceIndex)),
  1000. uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
  1001. }
  1002. func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
  1003. if t.info.PieceLength == 0 {
  1004. // There will be no variance amongst pieces. Only pain.
  1005. return 0
  1006. }
  1007. if t.info.FilesArePieceAligned() {
  1008. p := t.piece(piece)
  1009. file := p.mustGetOnlyFile()
  1010. if piece == file.EndPieceIndex()-1 {
  1011. return pp.Integer(file.length - (p.torrentBeginOffset() - file.offset))
  1012. }
  1013. return pp.Integer(t.usualPieceSize())
  1014. }
  1015. if piece == t.numPieces()-1 {
  1016. ret := pp.Integer(t.length() % t.info.PieceLength)
  1017. if ret != 0 {
  1018. return ret
  1019. }
  1020. }
  1021. return pp.Integer(t.info.PieceLength)
  1022. }
  1023. func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
  1024. return &blockCheckingWriter{
  1025. cache: &t.smartBanCache,
  1026. requestIndex: t.pieceRequestIndexOffset(piece),
  1027. chunkSize: t.chunkSize.Int(),
  1028. }
  1029. }
  1030. func (t *Torrent) hashPiece(piece pieceIndex) (
  1031. correct bool,
  1032. // These are peers that sent us blocks that differ from what we hash here.
  1033. differingPeers map[bannableAddr]struct{},
  1034. err error,
  1035. ) {
  1036. p := t.piece(piece)
  1037. p.waitNoPendingWrites()
  1038. storagePiece := p.Storage()
  1039. if p.hash != nil {
  1040. // Does the backend want to do its own hashing?
  1041. if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
  1042. var sum metainfo.Hash
  1043. // log.Printf("A piece decided to self-hash: %d", piece)
  1044. sum, err = i.SelfHash()
  1045. correct = sum == *p.hash
  1046. // Can't do smart banning without reading the piece. The smartBanCache is still cleared
  1047. // in pieceHasher regardless.
  1048. return
  1049. }
  1050. h := pieceHash.New()
  1051. differingPeers, err = t.hashPieceWithSpecificHash(piece, h)
  1052. // For a hybrid torrent, we work with the v2 files, but if we use a v1 hash, we can assume
  1053. // that the pieces are padded with zeroes.
  1054. if t.info.FilesArePieceAligned() {
  1055. paddingLen := p.Info().V1Length() - p.Info().Length()
  1056. written, err := io.CopyN(h, zeroReader, paddingLen)
  1057. if written != paddingLen {
  1058. panic(fmt.Sprintf(
  1059. "piece %v: wrote %v bytes of padding, expected %v, error: %v",
  1060. piece,
  1061. written,
  1062. paddingLen,
  1063. err,
  1064. ))
  1065. }
  1066. }
  1067. var sum [20]byte
  1068. sumExactly(sum[:], h.Sum)
  1069. correct = sum == *p.hash
  1070. } else if p.hashV2.Ok {
  1071. h := merkle.NewHash()
  1072. differingPeers, err = t.hashPieceWithSpecificHash(piece, h)
  1073. var sum [32]byte
  1074. // What about the final piece in a torrent? From BEP 52: "The layer is chosen so that one
  1075. // hash covers piece length bytes.". Note that if a piece doesn't have a hash in piece
  1076. // layers it's because it's not larger than the piece length.
  1077. sumExactly(sum[:], func(b []byte) []byte {
  1078. return h.SumMinLength(b, int(t.info.PieceLength))
  1079. })
  1080. correct = sum == p.hashV2.Value
  1081. } else {
  1082. expected := p.mustGetOnlyFile().piecesRoot.Unwrap()
  1083. h := merkle.NewHash()
  1084. differingPeers, err = t.hashPieceWithSpecificHash(piece, h)
  1085. var sum [32]byte
  1086. // This is *not* padded to piece length.
  1087. sumExactly(sum[:], h.Sum)
  1088. correct = sum == expected
  1089. }
  1090. return
  1091. }
  1092. func sumExactly(dst []byte, sum func(b []byte) []byte) {
  1093. n := len(sum(dst[:0]))
  1094. if n != len(dst) {
  1095. panic(n)
  1096. }
  1097. }
  1098. func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) (
  1099. // These are peers that sent us blocks that differ from what we hash here.
  1100. differingPeers map[bannableAddr]struct{},
  1101. err error,
  1102. ) {
  1103. p := t.piece(piece)
  1104. storagePiece := p.Storage()
  1105. smartBanWriter := t.smartBanBlockCheckingWriter(piece)
  1106. multiWriter := io.MultiWriter(h, smartBanWriter)
  1107. {
  1108. var written int64
  1109. written, err = storagePiece.WriteTo(multiWriter)
  1110. if err == nil && written != int64(p.length()) {
  1111. err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length())
  1112. // Skip smart banning since we can't blame them for storage issues. A short write would
  1113. // ban peers for all recorded blocks that weren't just written.
  1114. return
  1115. }
  1116. }
  1117. // Flush before writing padding, since we would not have recorded the padding blocks.
  1118. smartBanWriter.Flush()
  1119. differingPeers = smartBanWriter.badPeers
  1120. return
  1121. }
  1122. func (t *Torrent) haveAnyPieces() bool {
  1123. return !t._completedPieces.IsEmpty()
  1124. }
  1125. func (t *Torrent) haveAllPieces() bool {
  1126. if !t.haveInfo() {
  1127. return false
  1128. }
  1129. return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
  1130. }
  1131. func (t *Torrent) havePiece(index pieceIndex) bool {
  1132. return t.haveInfo() && t.pieceComplete(index)
  1133. }
  1134. func (t *Torrent) maybeDropMutuallyCompletePeer(
  1135. // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
  1136. // okay?
  1137. p *PeerConn,
  1138. ) {
  1139. if !t.cl.config.DropMutuallyCompletePeers {
  1140. return
  1141. }
  1142. if !t.haveAllPieces() {
  1143. return
  1144. }
  1145. if all, known := p.peerHasAllPieces(); !(known && all) {
  1146. return
  1147. }
  1148. if p.useful() {
  1149. return
  1150. }
  1151. p.logger.Levelf(log.Debug, "is mutually complete; dropping")
  1152. p.drop()
  1153. }
  1154. func (t *Torrent) haveChunk(r Request) (ret bool) {
  1155. // defer func() {
  1156. // log.Println("have chunk", r, ret)
  1157. // }()
  1158. if !t.haveInfo() {
  1159. return false
  1160. }
  1161. if t.pieceComplete(pieceIndex(r.Index)) {
  1162. return true
  1163. }
  1164. p := &t.pieces[r.Index]
  1165. return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
  1166. }
  1167. func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
  1168. return chunkIndexType(cs.Begin / chunkSize)
  1169. }
  1170. func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
  1171. return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index))
  1172. }
  1173. // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
  1174. // conns (which is a map).
  1175. var peerConnSlices sync.Pool
  1176. func getPeerConnSlice(cap int) []*PeerConn {
  1177. getInterface := peerConnSlices.Get()
  1178. if getInterface == nil {
  1179. return make([]*PeerConn, 0, cap)
  1180. } else {
  1181. return getInterface.([]*PeerConn)[:0]
  1182. }
  1183. }
  1184. // Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
  1185. // this is a frequent occurrence.
  1186. func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
  1187. sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
  1188. f(sl)
  1189. peerConnSlices.Put(sl)
  1190. }
  1191. func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
  1192. wcs := worseConnSlice{conns: sl}
  1193. wcs.initKeys(opts)
  1194. heap.Init(&wcs)
  1195. for wcs.Len() != 0 {
  1196. c := heap.Pop(&wcs).(*PeerConn)
  1197. if opts.incomingIsBad && !c.outgoing {
  1198. return c
  1199. }
  1200. if opts.outgoingIsBad && c.outgoing {
  1201. return c
  1202. }
  1203. if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
  1204. return c
  1205. }
  1206. // If the connection is in the worst half of the established
  1207. // connection quota and is older than a minute.
  1208. if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
  1209. // Give connections 1 minute to prove themselves.
  1210. if time.Since(c.completedHandshake) > time.Minute {
  1211. return c
  1212. }
  1213. }
  1214. }
  1215. return nil
  1216. }
  1217. // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
  1218. // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
  1219. // established connections for more than a minute. This is O(n log n). If there was a way to not
  1220. // consider the position of a conn relative to the total number, it could be reduced to O(n).
  1221. func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
  1222. t.withUnclosedConns(func(ucs []*PeerConn) {
  1223. ret = t.worstBadConnFromSlice(opts, ucs)
  1224. })
  1225. return
  1226. }
  1227. type PieceStateChange struct {
  1228. Index int
  1229. PieceState
  1230. }
  1231. func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
  1232. t.cl._mu.Defer(func() {
  1233. cur := t.pieceState(piece)
  1234. p := &t.pieces[piece]
  1235. if cur != p.publicPieceState {
  1236. p.publicPieceState = cur
  1237. t.pieceStateChanges.Publish(PieceStateChange{
  1238. int(piece),
  1239. cur,
  1240. })
  1241. }
  1242. })
  1243. }
  1244. func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
  1245. if t.pieceComplete(piece) {
  1246. return 0
  1247. }
  1248. return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
  1249. }
  1250. func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
  1251. return t.pieces[piece].allChunksDirty()
  1252. }
  1253. func (t *Torrent) readersChanged() {
  1254. t.updateReaderPieces()
  1255. t.updateAllPiecePriorities("Torrent.readersChanged")
  1256. }
  1257. func (t *Torrent) updateReaderPieces() {
  1258. t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
  1259. }
  1260. func (t *Torrent) readerPosChanged(from, to pieceRange) {
  1261. if from == to {
  1262. return
  1263. }
  1264. t.updateReaderPieces()
  1265. // Order the ranges, high and low.
  1266. l, h := from, to
  1267. if l.begin > h.begin {
  1268. l, h = h, l
  1269. }
  1270. if l.end < h.begin {
  1271. // Two distinct ranges.
  1272. t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
  1273. t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
  1274. } else {
  1275. // Ranges overlap.
  1276. end := l.end
  1277. if h.end > end {
  1278. end = h.end
  1279. }
  1280. t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
  1281. }
  1282. }
  1283. func (t *Torrent) maybeNewConns() {
  1284. // Tickle the accept routine.
  1285. t.cl.event.Broadcast()
  1286. t.openNewConns()
  1287. }
  1288. func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason updateRequestReason) {
  1289. if t._pendingPieces.Contains(uint32(piece)) {
  1290. t.iterPeers(func(c *Peer) {
  1291. // if c.requestState.Interested {
  1292. // return
  1293. // }
  1294. if !c.isLowOnRequests() {
  1295. return
  1296. }
  1297. if !c.peerHasPiece(piece) {
  1298. return
  1299. }
  1300. if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
  1301. return
  1302. }
  1303. c.updateRequests(reason)
  1304. })
  1305. }
  1306. t.maybeNewConns()
  1307. t.publishPieceStateChange(piece)
  1308. }
  1309. func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
  1310. if !t.closed.IsSet() {
  1311. // It would be possible to filter on pure-priority changes here to avoid churning the piece
  1312. // request order.
  1313. t.updatePieceRequestOrderPiece(piece)
  1314. }
  1315. p := t.piece(piece)
  1316. newPrio := p.effectivePriority()
  1317. // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
  1318. if newPrio == PiecePriorityNone && p.haveHash() {
  1319. return t._pendingPieces.CheckedRemove(uint32(piece))
  1320. } else {
  1321. return t._pendingPieces.CheckedAdd(uint32(piece))
  1322. }
  1323. }
  1324. func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) {
  1325. if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
  1326. t.onPiecePendingTriggers(piece, reason)
  1327. }
  1328. t.updatePieceRequestOrderPiece(piece)
  1329. }
  1330. func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) {
  1331. t.updatePiecePriorities(0, t.numPieces(), reason)
  1332. }
  1333. // Update all piece priorities in one hit. This function should have the same
  1334. // output as updatePiecePriority, but across all pieces.
  1335. func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason updateRequestReason) {
  1336. for i := begin; i < end; i++ {
  1337. t.updatePiecePriority(i, reason)
  1338. }
  1339. }
  1340. // Returns the range of pieces [begin, end) that contains the extent of bytes.
  1341. func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
  1342. if off >= t.length() {
  1343. return
  1344. }
  1345. if off < 0 {
  1346. size += off
  1347. off = 0
  1348. }
  1349. if size <= 0 {
  1350. return
  1351. }
  1352. begin = pieceIndex(off / t.info.PieceLength)
  1353. end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
  1354. if end > pieceIndex(t.info.NumPieces()) {
  1355. end = pieceIndex(t.info.NumPieces())
  1356. }
  1357. return
  1358. }
  1359. // Returns true if all iterations complete without breaking. Returns the read regions for all
  1360. // readers. The reader regions should not be merged as some callers depend on this method to
  1361. // enumerate readers.
  1362. func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
  1363. for r := range t.readers {
  1364. p := r.pieces
  1365. if p.begin >= p.end {
  1366. continue
  1367. }
  1368. if !f(p.begin, p.end) {
  1369. return false
  1370. }
  1371. }
  1372. return true
  1373. }
  1374. func (t *Torrent) pendRequest(req RequestIndex) {
  1375. t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
  1376. }
  1377. func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason updateRequestReason) {
  1378. t.cl.event.Broadcast()
  1379. if t.pieceComplete(piece) {
  1380. t.onPieceCompleted(piece)
  1381. } else {
  1382. t.onIncompletePiece(piece)
  1383. }
  1384. t.updatePiecePriority(piece, reason)
  1385. }
  1386. func (t *Torrent) numReceivedConns() (ret int) {
  1387. for c := range t.conns {
  1388. if c.Discovery == PeerSourceIncoming {
  1389. ret++
  1390. }
  1391. }
  1392. return
  1393. }
  1394. func (t *Torrent) numOutgoingConns() (ret int) {
  1395. for c := range t.conns {
  1396. if c.outgoing {
  1397. ret++
  1398. }
  1399. }
  1400. return
  1401. }
  1402. func (t *Torrent) maxHalfOpen() int {
  1403. // Note that if we somehow exceed the maximum established conns, we want
  1404. // the negative value to have an effect.
  1405. establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
  1406. extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
  1407. // We want to allow some experimentation with new peers, and to try to
  1408. // upset an oversupply of received connections.
  1409. return int(min(
  1410. max(5, extraIncoming)+establishedHeadroom,
  1411. int64(t.cl.config.HalfOpenConnsPerTorrent),
  1412. ))
  1413. }
  1414. func (t *Torrent) openNewConns() (initiated int) {
  1415. defer t.updateWantPeersEvent()
  1416. for t.peers.Len() != 0 {
  1417. if !t.wantOutgoingConns() {
  1418. return
  1419. }
  1420. if len(t.halfOpen) >= t.maxHalfOpen() {
  1421. return
  1422. }
  1423. if len(t.cl.dialers) == 0 {
  1424. return
  1425. }
  1426. if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
  1427. return
  1428. }
  1429. p := t.peers.PopMax()
  1430. opts := outgoingConnOpts{
  1431. peerInfo: p,
  1432. t: t,
  1433. requireRendezvous: false,
  1434. skipHolepunchRendezvous: false,
  1435. receivedHolepunchConnect: false,
  1436. HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
  1437. }
  1438. initiateConn(opts, false)
  1439. initiated++
  1440. }
  1441. return
  1442. }
  1443. func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
  1444. p := t.piece(piece)
  1445. uncached := t.pieceCompleteUncached(piece)
  1446. cached := p.completion()
  1447. changed := cached != uncached
  1448. complete := uncached.Complete
  1449. p.storageCompletionOk = uncached.Ok
  1450. x := uint32(piece)
  1451. if complete {
  1452. t._completedPieces.Add(x)
  1453. t.openNewConns()
  1454. } else {
  1455. t._completedPieces.Remove(x)
  1456. }
  1457. p.t.updatePieceRequestOrderPiece(piece)
  1458. t.updateComplete()
  1459. if complete && len(p.dirtiers) != 0 {
  1460. t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
  1461. }
  1462. if changed {
  1463. //slog.Debug(
  1464. // "piece completion changed",
  1465. // slog.Int("piece", piece),
  1466. // slog.Any("from", cached),
  1467. // slog.Any("to", uncached))
  1468. t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
  1469. }
  1470. return changed
  1471. }
  1472. // Non-blocking read. Client lock is not required.
  1473. func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
  1474. for len(b) != 0 {
  1475. p := &t.pieces[off/t.info.PieceLength]
  1476. p.waitNoPendingWrites()
  1477. var n1 int
  1478. n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
  1479. if n1 == 0 {
  1480. break
  1481. }
  1482. off += int64(n1)
  1483. n += n1
  1484. b = b[n1:]
  1485. }
  1486. return
  1487. }
  1488. // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
  1489. // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
  1490. // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
  1491. func (t *Torrent) maybeCompleteMetadata() error {
  1492. if t.haveInfo() {
  1493. // Nothing to do.
  1494. return nil
  1495. }
  1496. if !t.haveAllMetadataPieces() {
  1497. // Don't have enough metadata pieces.
  1498. return nil
  1499. }
  1500. err := t.setInfoBytesLocked(t.metadataBytes)
  1501. if err != nil {
  1502. t.invalidateMetadata()
  1503. return fmt.Errorf("error setting info bytes: %s", err)
  1504. }
  1505. if t.cl.config.Debug {
  1506. t.logger.Printf("%s: got metadata from peers", t)
  1507. }
  1508. return nil
  1509. }
  1510. func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
  1511. t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
  1512. if end > begin {
  1513. now.Add(bitmap.BitIndex(begin))
  1514. readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
  1515. }
  1516. return true
  1517. })
  1518. return
  1519. }
  1520. func (t *Torrent) needData() bool {
  1521. if t.closed.IsSet() {
  1522. return false
  1523. }
  1524. if !t.haveInfo() {
  1525. return true
  1526. }
  1527. return !t._pendingPieces.IsEmpty()
  1528. }
  1529. func appendMissingStrings(old, new []string) (ret []string) {
  1530. ret = old
  1531. new:
  1532. for _, n := range new {
  1533. for _, o := range old {
  1534. if o == n {
  1535. continue new
  1536. }
  1537. }
  1538. ret = append(ret, n)
  1539. }
  1540. return
  1541. }
  1542. func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
  1543. ret = existing
  1544. for minNumTiers > len(ret) {
  1545. ret = append(ret, nil)
  1546. }
  1547. return
  1548. }
  1549. func (t *Torrent) addTrackers(announceList [][]string) {
  1550. fullAnnounceList := &t.metainfo.AnnounceList
  1551. t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
  1552. for tierIndex, trackerURLs := range announceList {
  1553. (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
  1554. }
  1555. t.startMissingTrackerScrapers()
  1556. t.updateWantPeersEvent()
  1557. }
  1558. func (t *Torrent) modifyTrackers(announceList [][]string) {
  1559. var workers errgroup.Group
  1560. for _, v := range t.trackerAnnouncers {
  1561. workers.Go(func() error {
  1562. v.Stop()
  1563. return nil
  1564. })
  1565. }
  1566. workers.Wait()
  1567. clear(t.metainfo.AnnounceList)
  1568. t.addTrackers(announceList)
  1569. }
  1570. // Don't call this before the info is available.
  1571. func (t *Torrent) bytesCompleted() int64 {
  1572. if !t.haveInfo() {
  1573. return 0
  1574. }
  1575. return t.length() - t.bytesLeft()
  1576. }
  1577. func (t *Torrent) SetInfoBytes(b []byte) (err error) {
  1578. t.cl.lock()
  1579. defer t.cl.unlock()
  1580. return t.setInfoBytesLocked(b)
  1581. }
  1582. // Returns true if connection is removed from torrent.Conns.
  1583. func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
  1584. if !c.closed.IsSet() {
  1585. panic("connection is not closed")
  1586. // There are behaviours prevented by the closed state that will fail
  1587. // if the connection has been deleted.
  1588. }
  1589. _, ret = t.conns[c]
  1590. delete(t.conns, c)
  1591. // Avoid adding a drop event more than once. Probably we should track whether we've generated
  1592. // the drop event against the PexConnState instead.
  1593. if ret {
  1594. if !t.cl.config.DisablePEX {
  1595. t.pex.Drop(c)
  1596. }
  1597. }
  1598. torrent.Add("deleted connections", 1)
  1599. c.deleteAllRequests("Torrent.deletePeerConn")
  1600. t.assertPendingRequests()
  1601. if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
  1602. panic(t.connsWithAllPieces)
  1603. }
  1604. return
  1605. }
  1606. func (t *Torrent) decPeerPieceAvailability(p *Peer) {
  1607. if t.deleteConnWithAllPieces(p) {
  1608. return
  1609. }
  1610. if !t.haveInfo() {
  1611. return
  1612. }
  1613. p.peerPieces().Iterate(func(i uint32) bool {
  1614. p.t.decPieceAvailability(pieceIndex(i))
  1615. return true
  1616. })
  1617. }
  1618. func (t *Torrent) assertPendingRequests() {
  1619. if !check.Enabled {
  1620. return
  1621. }
  1622. // var actual pendingRequests
  1623. // if t.haveInfo() {
  1624. // actual.m = make([]int, t.numChunks())
  1625. // }
  1626. // t.iterPeers(func(p *Peer) {
  1627. // p.requestState.Requests.Iterate(func(x uint32) bool {
  1628. // actual.Inc(x)
  1629. // return true
  1630. // })
  1631. // })
  1632. // diff := cmp.Diff(actual.m, t.pendingRequests.m)
  1633. // if diff != "" {
  1634. // panic(diff)
  1635. // }
  1636. }
  1637. func (t *Torrent) dropConnection(c *PeerConn) {
  1638. t.cl.event.Broadcast()
  1639. c.close()
  1640. if t.deletePeerConn(c) {
  1641. t.openNewConns()
  1642. }
  1643. }
  1644. // Peers as in contact information for dialing out.
  1645. func (t *Torrent) wantPeers() bool {
  1646. if t.closed.IsSet() {
  1647. return false
  1648. }
  1649. if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
  1650. return false
  1651. }
  1652. return t.wantOutgoingConns()
  1653. }
  1654. func (t *Torrent) updateWantPeersEvent() {
  1655. if t.wantPeers() {
  1656. t.wantPeersEvent.Set()
  1657. } else {
  1658. t.wantPeersEvent.Clear()
  1659. }
  1660. }
  1661. // Returns whether the client should make effort to seed the torrent.
  1662. func (t *Torrent) seeding() bool {
  1663. cl := t.cl
  1664. if t.closed.IsSet() {
  1665. return false
  1666. }
  1667. if t.dataUploadDisallowed {
  1668. return false
  1669. }
  1670. if cl.config.NoUpload {
  1671. return false
  1672. }
  1673. if !cl.config.Seed {
  1674. return false
  1675. }
  1676. if cl.config.DisableAggressiveUpload && t.needData() {
  1677. return false
  1678. }
  1679. return true
  1680. }
  1681. func (t *Torrent) onWebRtcConn(
  1682. c datachannel.ReadWriteCloser,
  1683. dcc webtorrent.DataChannelContext,
  1684. ) {
  1685. defer c.Close()
  1686. netConn := webrtcNetConn{
  1687. ReadWriteCloser: c,
  1688. DataChannelContext: dcc,
  1689. }
  1690. peerRemoteAddr := netConn.RemoteAddr()
  1691. //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
  1692. if t.cl.badPeerAddr(peerRemoteAddr) {
  1693. return
  1694. }
  1695. localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
  1696. pc, err := t.cl.initiateProtocolHandshakes(
  1697. context.Background(),
  1698. netConn,
  1699. t,
  1700. false,
  1701. newConnectionOpts{
  1702. outgoing: dcc.LocalOffered,
  1703. remoteAddr: peerRemoteAddr,
  1704. localPublicAddr: localAddrIpPort,
  1705. network: webrtcNetwork,
  1706. connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
  1707. },
  1708. )
  1709. if err != nil {
  1710. t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
  1711. return
  1712. }
  1713. if dcc.LocalOffered {
  1714. pc.Discovery = PeerSourceTracker
  1715. } else {
  1716. pc.Discovery = PeerSourceIncoming
  1717. }
  1718. pc.conn.SetWriteDeadline(time.Time{})
  1719. t.cl.lock()
  1720. defer t.cl.unlock()
  1721. err = t.runHandshookConn(pc)
  1722. if err != nil {
  1723. t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
  1724. }
  1725. }
  1726. func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
  1727. err := t.runHandshookConn(pc)
  1728. if err != nil || logAll {
  1729. t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
  1730. }
  1731. }
  1732. func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
  1733. t.logRunHandshookConn(pc, false, log.Debug)
  1734. }
  1735. func (t *Torrent) startWebsocketAnnouncer(u url.URL, shortInfohash [20]byte) torrentTrackerAnnouncer {
  1736. wtc, release := t.cl.websocketTrackers.Get(u.String(), shortInfohash)
  1737. // This needs to run before the Torrent is dropped from the Client, to prevent a new
  1738. // webtorrent.TrackerClient for the same info hash before the old one is cleaned up.
  1739. t.onClose = append(t.onClose, release)
  1740. wst := websocketTrackerStatus{u, wtc}
  1741. go func() {
  1742. err := wtc.Announce(tracker.Started, shortInfohash)
  1743. if err != nil {
  1744. level := log.Warning
  1745. if t.closed.IsSet() {
  1746. level = log.Debug
  1747. }
  1748. t.logger.Levelf(level, "error doing initial announce to %q: %v", u.String(), err)
  1749. }
  1750. }()
  1751. return wst
  1752. }
  1753. func (t *Torrent) startScrapingTracker(_url string) {
  1754. if _url == "" {
  1755. return
  1756. }
  1757. u, err := url.Parse(_url)
  1758. if err != nil {
  1759. // URLs with a leading '*' appear to be a uTorrent convention to disable trackers.
  1760. if _url[0] != '*' {
  1761. t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err)
  1762. }
  1763. return
  1764. }
  1765. if u.Scheme == "udp" {
  1766. u.Scheme = "udp4"
  1767. t.startScrapingTracker(u.String())
  1768. u.Scheme = "udp6"
  1769. t.startScrapingTracker(u.String())
  1770. return
  1771. }
  1772. if t.infoHash.Ok {
  1773. t.startScrapingTrackerWithInfohash(u, _url, t.infoHash.Value)
  1774. }
  1775. if t.infoHashV2.Ok {
  1776. t.startScrapingTrackerWithInfohash(u, _url, *t.infoHashV2.Value.ToShort())
  1777. }
  1778. }
  1779. func (t *Torrent) startScrapingTrackerWithInfohash(u *url.URL, urlStr string, shortInfohash [20]byte) {
  1780. announcerKey := torrentTrackerAnnouncerKey{
  1781. shortInfohash: shortInfohash,
  1782. url: urlStr,
  1783. }
  1784. if _, ok := t.trackerAnnouncers[announcerKey]; ok {
  1785. return
  1786. }
  1787. sl := func() torrentTrackerAnnouncer {
  1788. switch u.Scheme {
  1789. case "ws", "wss":
  1790. if t.cl.config.DisableWebtorrent {
  1791. return nil
  1792. }
  1793. return t.startWebsocketAnnouncer(*u, shortInfohash)
  1794. case "udp4":
  1795. if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
  1796. return nil
  1797. }
  1798. case "udp6":
  1799. if t.cl.config.DisableIPv6 {
  1800. return nil
  1801. }
  1802. }
  1803. newAnnouncer := &trackerScraper{
  1804. shortInfohash: shortInfohash,
  1805. u: *u,
  1806. t: t,
  1807. lookupTrackerIp: t.cl.config.LookupTrackerIp,
  1808. stopCh: make(chan struct{}),
  1809. }
  1810. go newAnnouncer.Run()
  1811. return newAnnouncer
  1812. }()
  1813. if sl == nil {
  1814. return
  1815. }
  1816. g.MakeMapIfNil(&t.trackerAnnouncers)
  1817. if g.MapInsert(t.trackerAnnouncers, announcerKey, sl).Ok {
  1818. panic("tracker announcer already exists")
  1819. }
  1820. }
  1821. // Adds and starts tracker scrapers for tracker URLs that aren't already
  1822. // running.
  1823. func (t *Torrent) startMissingTrackerScrapers() {
  1824. if t.cl.config.DisableTrackers {
  1825. return
  1826. }
  1827. t.startScrapingTracker(t.metainfo.Announce)
  1828. for _, tier := range t.metainfo.AnnounceList {
  1829. for _, url := range tier {
  1830. t.startScrapingTracker(url)
  1831. }
  1832. }
  1833. }
  1834. // Returns an AnnounceRequest with fields filled out to defaults and current
  1835. // values.
  1836. func (t *Torrent) announceRequest(
  1837. event tracker.AnnounceEvent,
  1838. shortInfohash [20]byte,
  1839. ) tracker.AnnounceRequest {
  1840. // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
  1841. // dependent on the network in use.
  1842. return tracker.AnnounceRequest{
  1843. Event: event,
  1844. NumWant: func() int32 {
  1845. if t.wantPeers() && len(t.cl.dialers) > 0 {
  1846. // Windozer has UDP packet limit. See:
  1847. // https://github.com/anacrolix/torrent/issues/764
  1848. return 200
  1849. } else {
  1850. return 0
  1851. }
  1852. }(),
  1853. Port: uint16(t.cl.incomingPeerPort()),
  1854. PeerId: t.cl.peerID,
  1855. InfoHash: shortInfohash,
  1856. Key: t.cl.announceKey(),
  1857. // The following are vaguely described in BEP 3.
  1858. Left: t.bytesLeftAnnounce(),
  1859. Uploaded: t.stats.BytesWrittenData.Int64(),
  1860. // There's no mention of wasted or unwanted download in the BEP.
  1861. Downloaded: t.stats.BytesReadUsefulData.Int64(),
  1862. }
  1863. }
  1864. // Adds peers revealed in an announce until the announce ends, or we have
  1865. // enough peers.
  1866. func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
  1867. cl := t.cl
  1868. for v := range pvs {
  1869. cl.lock()
  1870. added := 0
  1871. for _, cp := range v.Peers {
  1872. if cp.Port == 0 {
  1873. // Can't do anything with this.
  1874. continue
  1875. }
  1876. if t.addPeer(PeerInfo{
  1877. Addr: ipPortAddr{cp.IP, cp.Port},
  1878. Source: PeerSourceDhtGetPeers,
  1879. }) {
  1880. added++
  1881. }
  1882. }
  1883. cl.unlock()
  1884. // if added != 0 {
  1885. // log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
  1886. // }
  1887. }
  1888. }
  1889. // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
  1890. // announce ends. stop will force the announce to end. This interface is really old-school, and
  1891. // calls a private one that is much more modern. Both v1 and v2 info hashes are announced if they
  1892. // exist.
  1893. func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
  1894. var ihs [][20]byte
  1895. t.cl.lock()
  1896. t.eachShortInfohash(func(short [20]byte) {
  1897. ihs = append(ihs, short)
  1898. })
  1899. t.cl.unlock()
  1900. ctx, stop := context.WithCancel(context.Background())
  1901. eg, ctx := errgroup.WithContext(ctx)
  1902. for _, ih := range ihs {
  1903. var ann DhtAnnounce
  1904. ann, err = s.Announce(ih, t.cl.incomingPeerPort(), true)
  1905. if err != nil {
  1906. stop()
  1907. return
  1908. }
  1909. eg.Go(func() error {
  1910. return t.dhtAnnounceConsumer(ctx, ann)
  1911. })
  1912. }
  1913. _done := make(chan struct{})
  1914. done = _done
  1915. go func() {
  1916. defer stop()
  1917. defer close(_done)
  1918. eg.Wait()
  1919. }()
  1920. return
  1921. }
  1922. // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
  1923. // announce ends. stop will force the announce to end.
  1924. func (t *Torrent) dhtAnnounceConsumer(
  1925. ctx context.Context,
  1926. ps DhtAnnounce,
  1927. ) (
  1928. err error,
  1929. ) {
  1930. defer ps.Close()
  1931. done := make(chan struct{})
  1932. go func() {
  1933. defer close(done)
  1934. t.consumeDhtAnnouncePeers(ps.Peers())
  1935. }()
  1936. select {
  1937. case <-ctx.Done():
  1938. return context.Cause(ctx)
  1939. case <-done:
  1940. return nil
  1941. }
  1942. }
  1943. func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
  1944. _, stop, err := t.AnnounceToDht(s)
  1945. if err != nil {
  1946. return err
  1947. }
  1948. select {
  1949. case <-t.closed.Done():
  1950. case <-time.After(5 * time.Minute):
  1951. }
  1952. stop()
  1953. return nil
  1954. }
  1955. func (t *Torrent) dhtAnnouncer(s DhtServer) {
  1956. cl := t.cl
  1957. cl.lock()
  1958. defer cl.unlock()
  1959. for {
  1960. for {
  1961. if t.closed.IsSet() {
  1962. return
  1963. }
  1964. // We're also announcing ourselves as a listener, so we don't just want peer addresses.
  1965. // TODO: We can include the announce_peer step depending on whether we can receive
  1966. // inbound connections. We should probably only announce once every 15 mins too.
  1967. if !t.wantAnyConns() {
  1968. goto wait
  1969. }
  1970. // TODO: Determine if there's a listener on the port we're announcing.
  1971. if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
  1972. goto wait
  1973. }
  1974. break
  1975. wait:
  1976. cl.event.Wait()
  1977. }
  1978. func() {
  1979. t.numDHTAnnounces++
  1980. cl.unlock()
  1981. defer cl.lock()
  1982. err := t.timeboxedAnnounceToDht(s)
  1983. if err != nil {
  1984. t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
  1985. }
  1986. }()
  1987. }
  1988. }
  1989. func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
  1990. for _, p := range peers {
  1991. if t.addPeer(p) {
  1992. added++
  1993. }
  1994. }
  1995. return
  1996. }
  1997. // The returned TorrentStats may require alignment in memory. See
  1998. // https://github.com/anacrolix/torrent/issues/383.
  1999. func (t *Torrent) Stats() TorrentStats {
  2000. t.cl.rLock()
  2001. defer t.cl.rUnlock()
  2002. return t.statsLocked()
  2003. }
  2004. func (t *Torrent) statsLocked() (ret TorrentStats) {
  2005. ret.ActivePeers = len(t.conns)
  2006. ret.HalfOpenPeers = len(t.halfOpen)
  2007. ret.PendingPeers = t.peers.Len()
  2008. ret.TotalPeers = t.numTotalPeers()
  2009. ret.ConnectedSeeders = 0
  2010. for c := range t.conns {
  2011. if all, ok := c.peerHasAllPieces(); all && ok {
  2012. ret.ConnectedSeeders++
  2013. }
  2014. }
  2015. ret.ConnStats = t.stats.Copy()
  2016. ret.PiecesComplete = t.numPiecesCompleted()
  2017. return
  2018. }
  2019. // The total number of peers in the torrent.
  2020. func (t *Torrent) numTotalPeers() int {
  2021. peers := make(map[string]struct{})
  2022. for conn := range t.conns {
  2023. ra := conn.RemoteAddr
  2024. if ra == nil {
  2025. // It's been closed and doesn't support RemoteAddr.
  2026. continue
  2027. }
  2028. peers[ra.String()] = struct{}{}
  2029. }
  2030. for addr := range t.halfOpen {
  2031. peers[addr] = struct{}{}
  2032. }
  2033. t.peers.Each(func(peer PeerInfo) {
  2034. peers[peer.Addr.String()] = struct{}{}
  2035. })
  2036. return len(peers)
  2037. }
  2038. // Reconcile bytes transferred before connection was associated with a
  2039. // torrent.
  2040. func (t *Torrent) reconcileHandshakeStats(c *Peer) {
  2041. if c._stats != (ConnStats{
  2042. // Handshakes should only increment these fields:
  2043. BytesWritten: c._stats.BytesWritten,
  2044. BytesRead: c._stats.BytesRead,
  2045. }) {
  2046. panic("bad stats")
  2047. }
  2048. c.postHandshakeStats(func(cs *ConnStats) {
  2049. cs.BytesRead.Add(c._stats.BytesRead.Int64())
  2050. cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
  2051. })
  2052. c.reconciledHandshakeStats = true
  2053. }
  2054. // Returns true if the connection is added.
  2055. func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
  2056. defer func() {
  2057. if err == nil {
  2058. torrent.Add("added connections", 1)
  2059. }
  2060. }()
  2061. if t.closed.IsSet() {
  2062. return errors.New("torrent closed")
  2063. }
  2064. for c0 := range t.conns {
  2065. if c.PeerID != c0.PeerID {
  2066. continue
  2067. }
  2068. if !t.cl.config.DropDuplicatePeerIds {
  2069. continue
  2070. }
  2071. if c.hasPreferredNetworkOver(c0) {
  2072. c0.close()
  2073. t.deletePeerConn(c0)
  2074. } else {
  2075. return errors.New("existing connection preferred")
  2076. }
  2077. }
  2078. if len(t.conns) >= t.maxEstablishedConns {
  2079. numOutgoing := t.numOutgoingConns()
  2080. numIncoming := len(t.conns) - numOutgoing
  2081. c := t.worstBadConn(worseConnLensOpts{
  2082. // We've already established that we have too many connections at this point, so we just
  2083. // need to match what kind we have too many of vs. what we're trying to add now.
  2084. incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
  2085. outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
  2086. })
  2087. if c == nil {
  2088. return errors.New("don't want conn")
  2089. }
  2090. c.close()
  2091. t.deletePeerConn(c)
  2092. }
  2093. if len(t.conns) >= t.maxEstablishedConns {
  2094. panic(len(t.conns))
  2095. }
  2096. t.conns[c] = struct{}{}
  2097. t.cl.event.Broadcast()
  2098. // We'll never receive the "p" extended handshake parameter.
  2099. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
  2100. t.pex.Add(c)
  2101. }
  2102. return nil
  2103. }
  2104. func (t *Torrent) newConnsAllowed() bool {
  2105. if !t.networkingEnabled.Bool() {
  2106. return false
  2107. }
  2108. if t.closed.IsSet() {
  2109. return false
  2110. }
  2111. if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
  2112. return false
  2113. }
  2114. return true
  2115. }
  2116. func (t *Torrent) wantAnyConns() bool {
  2117. if !t.networkingEnabled.Bool() {
  2118. return false
  2119. }
  2120. if t.closed.IsSet() {
  2121. return false
  2122. }
  2123. if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
  2124. return false
  2125. }
  2126. return len(t.conns) < t.maxEstablishedConns
  2127. }
  2128. func (t *Torrent) wantOutgoingConns() bool {
  2129. if !t.newConnsAllowed() {
  2130. return false
  2131. }
  2132. if len(t.conns) < t.maxEstablishedConns {
  2133. return true
  2134. }
  2135. numIncomingConns := len(t.conns) - t.numOutgoingConns()
  2136. return t.worstBadConn(worseConnLensOpts{
  2137. incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
  2138. outgoingIsBad: false,
  2139. }) != nil
  2140. }
  2141. func (t *Torrent) wantIncomingConns() bool {
  2142. if !t.newConnsAllowed() {
  2143. return false
  2144. }
  2145. if len(t.conns) < t.maxEstablishedConns {
  2146. return true
  2147. }
  2148. numIncomingConns := len(t.conns) - t.numOutgoingConns()
  2149. return t.worstBadConn(worseConnLensOpts{
  2150. incomingIsBad: false,
  2151. outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
  2152. }) != nil
  2153. }
  2154. func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
  2155. t.cl.lock()
  2156. defer t.cl.unlock()
  2157. oldMax = t.maxEstablishedConns
  2158. t.maxEstablishedConns = max
  2159. wcs := worseConnSlice{
  2160. conns: t.appendConns(nil, func(*PeerConn) bool {
  2161. return true
  2162. }),
  2163. }
  2164. wcs.initKeys(worseConnLensOpts{})
  2165. heap.Init(&wcs)
  2166. for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
  2167. t.dropConnection(heap.Pop(&wcs).(*PeerConn))
  2168. }
  2169. t.openNewConns()
  2170. return oldMax
  2171. }
  2172. func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
  2173. t.logger.LazyLog(log.Debug, func() log.Msg {
  2174. return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
  2175. })
  2176. p := t.piece(piece)
  2177. p.numVerifies++
  2178. t.cl.event.Broadcast()
  2179. if t.closed.IsSet() {
  2180. return
  2181. }
  2182. // Don't score the first time a piece is hashed, it could be an initial check.
  2183. if p.storageCompletionOk {
  2184. if passed {
  2185. pieceHashedCorrect.Add(1)
  2186. } else {
  2187. log.Fmsg(
  2188. "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
  2189. ).AddValues(t, p).LogLevel(log.Info, t.logger)
  2190. pieceHashedNotCorrect.Add(1)
  2191. }
  2192. }
  2193. p.marking = true
  2194. t.publishPieceStateChange(piece)
  2195. defer func() {
  2196. p.marking = false
  2197. t.publishPieceStateChange(piece)
  2198. }()
  2199. if passed {
  2200. if len(p.dirtiers) != 0 {
  2201. // Don't increment stats above connection-level for every involved connection.
  2202. t.allStats((*ConnStats).incrementPiecesDirtiedGood)
  2203. }
  2204. for c := range p.dirtiers {
  2205. c._stats.incrementPiecesDirtiedGood()
  2206. }
  2207. t.clearPieceTouchers(piece)
  2208. hasDirty := p.hasDirtyChunks()
  2209. t.cl.unlock()
  2210. if hasDirty {
  2211. p.Flush() // You can be synchronous here!
  2212. }
  2213. err := p.Storage().MarkComplete()
  2214. if err != nil {
  2215. t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
  2216. }
  2217. t.cl.lock()
  2218. if t.closed.IsSet() {
  2219. return
  2220. }
  2221. t.pendAllChunkSpecs(piece)
  2222. } else {
  2223. if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
  2224. // Peers contributed to all the data for this piece hash failure, and the failure was
  2225. // not due to errors in the storage (such as data being dropped in a cache).
  2226. // Increment Torrent and above stats, and then specific connections.
  2227. t.allStats((*ConnStats).incrementPiecesDirtiedBad)
  2228. for c := range p.dirtiers {
  2229. // Y u do dis peer?!
  2230. c.stats().incrementPiecesDirtiedBad()
  2231. }
  2232. bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
  2233. for c := range p.dirtiers {
  2234. if !c.trusted {
  2235. bannableTouchers = append(bannableTouchers, c)
  2236. }
  2237. }
  2238. t.clearPieceTouchers(piece)
  2239. slices.Sort(bannableTouchers, connLessTrusted)
  2240. if t.cl.config.Debug {
  2241. t.logger.Printf(
  2242. "bannable conns by trust for piece %d: %v",
  2243. piece,
  2244. func() (ret []connectionTrust) {
  2245. for _, c := range bannableTouchers {
  2246. ret = append(ret, c.trust())
  2247. }
  2248. return
  2249. }(),
  2250. )
  2251. }
  2252. if len(bannableTouchers) >= 1 {
  2253. c := bannableTouchers[0]
  2254. if len(bannableTouchers) != 1 {
  2255. t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
  2256. } else {
  2257. // Turns out it's still useful to ban peers like this because if there's only a
  2258. // single peer for a piece, and we never progress that piece to completion, we
  2259. // will never smart-ban them. Discovered in
  2260. // https://github.com/anacrolix/torrent/issues/715.
  2261. t.logger.Levelf(
  2262. log.Warning,
  2263. "banning %v for being sole dirtier of piece %v after failed piece check",
  2264. c,
  2265. piece,
  2266. )
  2267. c.ban()
  2268. }
  2269. }
  2270. }
  2271. t.onIncompletePiece(piece)
  2272. p.Storage().MarkNotComplete()
  2273. }
  2274. t.updatePieceCompletion(piece)
  2275. }
  2276. func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
  2277. start := t.pieceRequestIndexOffset(piece)
  2278. end := start + t.pieceNumChunks(piece)
  2279. for ri := start; ri < end; ri++ {
  2280. t.cancelRequest(ri)
  2281. }
  2282. }
  2283. func (t *Torrent) onPieceCompleted(piece pieceIndex) {
  2284. t.pendAllChunkSpecs(piece)
  2285. t.cancelRequestsForPiece(piece)
  2286. t.piece(piece).readerCond.Broadcast()
  2287. for conn := range t.conns {
  2288. conn.have(piece)
  2289. t.maybeDropMutuallyCompletePeer(conn)
  2290. }
  2291. }
  2292. // Called when a piece is found to be not complete.
  2293. func (t *Torrent) onIncompletePiece(piece pieceIndex) {
  2294. if t.pieceAllDirty(piece) {
  2295. t.pendAllChunkSpecs(piece)
  2296. }
  2297. if !t.wantPieceIndex(piece) {
  2298. // t.logger.Printf("piece %d incomplete and unwanted", piece)
  2299. return
  2300. }
  2301. // We could drop any connections that we told we have a piece that we
  2302. // don't here. But there's a test failure, and it seems clients don't care
  2303. // if you request pieces that you already claim to have. Pruning bad
  2304. // connections might just remove any connections that aren't treating us
  2305. // favourably anyway.
  2306. // for c := range t.conns {
  2307. // if c.sentHave(piece) {
  2308. // c.drop()
  2309. // }
  2310. // }
  2311. t.iterPeers(func(conn *Peer) {
  2312. if conn.peerHasPiece(piece) {
  2313. conn.updateRequests("piece incomplete")
  2314. }
  2315. })
  2316. }
  2317. func (t *Torrent) tryCreateMorePieceHashers() {
  2318. for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
  2319. }
  2320. }
  2321. func (t *Torrent) tryCreatePieceHasher() bool {
  2322. if t.storage == nil {
  2323. return false
  2324. }
  2325. pi, ok := t.getPieceToHash()
  2326. if !ok {
  2327. return false
  2328. }
  2329. p := t.piece(pi)
  2330. t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
  2331. p.hashing = true
  2332. t.publishPieceStateChange(pi)
  2333. t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
  2334. t.storageLock.RLock()
  2335. t.activePieceHashes++
  2336. go t.pieceHasher(pi)
  2337. return true
  2338. }
  2339. func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
  2340. t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
  2341. if t.piece(i).hashing {
  2342. return true
  2343. }
  2344. ret = i
  2345. ok = true
  2346. return false
  2347. })
  2348. return
  2349. }
  2350. func (t *Torrent) dropBannedPeers() {
  2351. t.iterPeers(func(p *Peer) {
  2352. remoteIp := p.remoteIp()
  2353. if remoteIp == nil {
  2354. if p.bannableAddr.Ok {
  2355. t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
  2356. }
  2357. return
  2358. }
  2359. netipAddr := netip.MustParseAddr(remoteIp.String())
  2360. if Some(netipAddr) != p.bannableAddr {
  2361. t.logger.WithDefaultLevel(log.Debug).Printf(
  2362. "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
  2363. p, remoteIp, p.bannableAddr)
  2364. }
  2365. if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
  2366. // Should this be a close?
  2367. p.drop()
  2368. t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
  2369. }
  2370. })
  2371. }
  2372. func (t *Torrent) pieceHasher(index pieceIndex) {
  2373. p := t.piece(index)
  2374. // Do we really need to spell out that it's a copy error? If it's a failure to hash the hash
  2375. // will just be wrong.
  2376. correct, failedPeers, copyErr := t.hashPiece(index)
  2377. switch copyErr {
  2378. case nil, io.EOF:
  2379. default:
  2380. t.logger.WithNames("hashing").Levelf(
  2381. log.Warning,
  2382. "error hashing piece %v: %v", index, copyErr)
  2383. }
  2384. t.storageLock.RUnlock()
  2385. t.cl.lock()
  2386. defer t.cl.unlock()
  2387. if correct {
  2388. for peer := range failedPeers {
  2389. t.cl.banPeerIP(peer.AsSlice())
  2390. t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
  2391. }
  2392. t.dropBannedPeers()
  2393. for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
  2394. t.smartBanCache.ForgetBlock(ri)
  2395. }
  2396. }
  2397. p.hashing = false
  2398. t.pieceHashed(index, correct, copyErr)
  2399. t.updatePiecePriority(index, "Torrent.pieceHasher")
  2400. t.activePieceHashes--
  2401. t.tryCreateMorePieceHashers()
  2402. }
  2403. // Return the connections that touched a piece, and clear the entries while doing it.
  2404. func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
  2405. p := t.piece(pi)
  2406. for c := range p.dirtiers {
  2407. delete(c.peerTouchedPieces, pi)
  2408. delete(p.dirtiers, c)
  2409. }
  2410. }
  2411. func (t *Torrent) peersAsSlice() (ret []*Peer) {
  2412. t.iterPeers(func(p *Peer) {
  2413. ret = append(ret, p)
  2414. })
  2415. return
  2416. }
  2417. func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
  2418. if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
  2419. t.queuePieceCheck(i)
  2420. }
  2421. }
  2422. func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
  2423. piece := t.piece(pieceIndex)
  2424. if !piece.haveHash() {
  2425. return
  2426. }
  2427. if piece.queuedForHash() {
  2428. return
  2429. }
  2430. t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
  2431. t.publishPieceStateChange(pieceIndex)
  2432. t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
  2433. t.tryCreateMorePieceHashers()
  2434. }
  2435. // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
  2436. // before the Info is available.
  2437. func (t *Torrent) VerifyData() {
  2438. for i := pieceIndex(0); i < t.NumPieces(); i++ {
  2439. t.Piece(i).VerifyData()
  2440. }
  2441. }
  2442. func (t *Torrent) connectingToPeerAddr(addrStr string) bool {
  2443. return len(t.halfOpen[addrStr]) != 0
  2444. }
  2445. func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool {
  2446. addrStr := x.String()
  2447. for c := range t.conns {
  2448. ra := c.RemoteAddr
  2449. if ra.String() == addrStr {
  2450. return true
  2451. }
  2452. }
  2453. return false
  2454. }
  2455. func (t *Torrent) getHalfOpenPath(
  2456. addrStr string,
  2457. attemptKey outgoingConnAttemptKey,
  2458. ) nestedmaps.Path[*PeerInfo] {
  2459. return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey)
  2460. }
  2461. func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) {
  2462. path := t.getHalfOpenPath(addrStr, attemptKey)
  2463. if path.Exists() {
  2464. panic("should be unique")
  2465. }
  2466. path.Set(attemptKey)
  2467. t.cl.numHalfOpen++
  2468. }
  2469. // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
  2470. // sure all the PeerInfo fields are being used.
  2471. func initiateConn(
  2472. opts outgoingConnOpts,
  2473. ignoreLimits bool,
  2474. ) {
  2475. t := opts.t
  2476. peer := opts.peerInfo
  2477. if peer.Id == t.cl.peerID {
  2478. return
  2479. }
  2480. if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
  2481. return
  2482. }
  2483. addr := peer.Addr
  2484. addrStr := addr.String()
  2485. if !ignoreLimits {
  2486. if t.connectingToPeerAddr(addrStr) {
  2487. return
  2488. }
  2489. }
  2490. if t.hasPeerConnForAddr(addr) {
  2491. return
  2492. }
  2493. attemptKey := &peer
  2494. t.addHalfOpen(addrStr, attemptKey)
  2495. go t.cl.outgoingConnection(
  2496. opts,
  2497. attemptKey,
  2498. )
  2499. }
  2500. // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
  2501. // quickly make one Client visible to the Torrent of another Client.
  2502. func (t *Torrent) AddClientPeer(cl *Client) int {
  2503. return t.AddPeers(func() (ps []PeerInfo) {
  2504. for _, la := range cl.ListenAddrs() {
  2505. ps = append(ps, PeerInfo{
  2506. Addr: la,
  2507. Trusted: true,
  2508. })
  2509. }
  2510. return
  2511. }())
  2512. }
  2513. // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
  2514. // connection.
  2515. func (t *Torrent) allStats(f func(*ConnStats)) {
  2516. f(&t.stats)
  2517. f(&t.cl.connStats)
  2518. }
  2519. func (t *Torrent) hashingPiece(i pieceIndex) bool {
  2520. return t.pieces[i].hashing
  2521. }
  2522. func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
  2523. return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
  2524. }
  2525. func (t *Torrent) dialTimeout() time.Duration {
  2526. return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
  2527. }
  2528. func (t *Torrent) piece(i int) *Piece {
  2529. return &t.pieces[i]
  2530. }
  2531. func (t *Torrent) onWriteChunkErr(err error) {
  2532. if t.userOnWriteChunkErr != nil {
  2533. go t.userOnWriteChunkErr(err)
  2534. return
  2535. }
  2536. t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
  2537. t.disallowDataDownloadLocked()
  2538. }
  2539. func (t *Torrent) DisallowDataDownload() {
  2540. t.cl.lock()
  2541. defer t.cl.unlock()
  2542. t.disallowDataDownloadLocked()
  2543. }
  2544. func (t *Torrent) disallowDataDownloadLocked() {
  2545. t.dataDownloadDisallowed.Set()
  2546. t.iterPeers(func(p *Peer) {
  2547. // Could check if peer request state is empty/not interested?
  2548. p.updateRequests("disallow data download")
  2549. p.cancelAllRequests()
  2550. })
  2551. }
  2552. func (t *Torrent) AllowDataDownload() {
  2553. t.cl.lock()
  2554. defer t.cl.unlock()
  2555. t.dataDownloadDisallowed.Clear()
  2556. t.iterPeers(func(p *Peer) {
  2557. p.updateRequests("allow data download")
  2558. })
  2559. }
  2560. // Enables uploading data, if it was disabled.
  2561. func (t *Torrent) AllowDataUpload() {
  2562. t.cl.lock()
  2563. defer t.cl.unlock()
  2564. t.dataUploadDisallowed = false
  2565. t.iterPeers(func(p *Peer) {
  2566. p.updateRequests("allow data upload")
  2567. })
  2568. }
  2569. // Disables uploading data, if it was enabled.
  2570. func (t *Torrent) DisallowDataUpload() {
  2571. t.cl.lock()
  2572. defer t.cl.unlock()
  2573. t.dataUploadDisallowed = true
  2574. for c := range t.conns {
  2575. // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
  2576. c.updateRequests("disallow data upload")
  2577. }
  2578. }
  2579. // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
  2580. // or if nil, a critical message is logged, and data download is disabled.
  2581. func (t *Torrent) SetOnWriteChunkError(f func(error)) {
  2582. t.cl.lock()
  2583. defer t.cl.unlock()
  2584. t.userOnWriteChunkErr = f
  2585. }
  2586. func (t *Torrent) iterPeers(f func(p *Peer)) {
  2587. for pc := range t.conns {
  2588. f(&pc.Peer)
  2589. }
  2590. for _, ws := range t.webSeeds {
  2591. f(ws)
  2592. }
  2593. }
  2594. func (t *Torrent) callbacks() *Callbacks {
  2595. return &t.cl.config.Callbacks
  2596. }
  2597. type AddWebSeedsOpt func(*webseed.Client)
  2598. // Sets the WebSeed trailing path escaper for a webseed.Client.
  2599. func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
  2600. return func(c *webseed.Client) {
  2601. c.PathEscaper = custom
  2602. }
  2603. }
  2604. func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
  2605. t.cl.lock()
  2606. defer t.cl.unlock()
  2607. for _, u := range urls {
  2608. t.addWebSeed(u, opts...)
  2609. }
  2610. }
  2611. func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
  2612. if t.cl.config.DisableWebseeds {
  2613. return
  2614. }
  2615. if _, ok := t.webSeeds[url]; ok {
  2616. return
  2617. }
  2618. // I don't think Go http supports pipelining requests. However, we can have more ready to go
  2619. // right away. This value should be some multiple of the number of connections to a host. I
  2620. // would expect that double maxRequests plus a bit would be appropriate. This value is based on
  2621. // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
  2622. // "https://webtorrent.io/torrents/".
  2623. const maxRequests = 16
  2624. ws := webseedPeer{
  2625. peer: Peer{
  2626. t: t,
  2627. outgoing: true,
  2628. Network: "http",
  2629. reconciledHandshakeStats: true,
  2630. // This should affect how often we have to recompute requests for this peer. Note that
  2631. // because we can request more than 1 thing at a time over HTTP, we will hit the low
  2632. // requests mark more often, so recomputation is probably sooner than with regular peer
  2633. // conns. ~4x maxRequests would be about right.
  2634. PeerMaxRequests: 128,
  2635. // TODO: Set ban prefix?
  2636. RemoteAddr: remoteAddrFromUrl(url),
  2637. callbacks: t.callbacks(),
  2638. },
  2639. client: webseed.Client{
  2640. HttpClient: t.cl.httpClient,
  2641. Url: url,
  2642. ResponseBodyWrapper: func(r io.Reader) io.Reader {
  2643. return &rateLimitedReader{
  2644. l: t.cl.config.DownloadRateLimiter,
  2645. r: r,
  2646. }
  2647. },
  2648. },
  2649. activeRequests: make(map[Request]webseed.Request, maxRequests),
  2650. }
  2651. ws.peer.initRequestState()
  2652. for _, opt := range opts {
  2653. opt(&ws.client)
  2654. }
  2655. ws.peer.initUpdateRequestsTimer()
  2656. ws.requesterCond.L = t.cl.locker()
  2657. for i := 0; i < maxRequests; i += 1 {
  2658. go ws.requester(i)
  2659. }
  2660. for _, f := range t.callbacks().NewPeer {
  2661. f(&ws.peer)
  2662. }
  2663. ws.peer.logger = t.logger.WithContextValue(&ws).WithNames("webseed")
  2664. ws.peer.peerImpl = &ws
  2665. if t.haveInfo() {
  2666. ws.onGotInfo(t.info)
  2667. }
  2668. t.webSeeds[url] = &ws.peer
  2669. ws.peer.updateRequests("Torrent.addWebSeed")
  2670. }
  2671. func (t *Torrent) peerIsActive(p *Peer) (active bool) {
  2672. t.iterPeers(func(p1 *Peer) {
  2673. if p1 == p {
  2674. active = true
  2675. }
  2676. })
  2677. return
  2678. }
  2679. func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
  2680. index := t.pieceIndexOfRequestIndex(ri)
  2681. return Request{
  2682. pp.Integer(index),
  2683. t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
  2684. }
  2685. }
  2686. func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
  2687. return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
  2688. }
  2689. func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
  2690. return RequestIndex(piece) * t.chunksPerRegularPiece()
  2691. }
  2692. func (t *Torrent) updateComplete() {
  2693. t.complete.SetBool(t.haveAllPieces())
  2694. }
  2695. func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
  2696. p := t.requestingPeer(r)
  2697. if p != nil {
  2698. p.cancel(r)
  2699. }
  2700. // TODO: This is a check that an old invariant holds. It can be removed after some testing.
  2701. //delete(t.pendingRequests, r)
  2702. if _, ok := t.requestState[r]; ok {
  2703. panic("expected request state to be gone")
  2704. }
  2705. return p
  2706. }
  2707. func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
  2708. return t.requestState[r].peer
  2709. }
  2710. func (t *Torrent) addConnWithAllPieces(p *Peer) {
  2711. if t.connsWithAllPieces == nil {
  2712. t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
  2713. }
  2714. t.connsWithAllPieces[p] = struct{}{}
  2715. }
  2716. func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
  2717. _, ok := t.connsWithAllPieces[p]
  2718. delete(t.connsWithAllPieces, p)
  2719. return ok
  2720. }
  2721. func (t *Torrent) numActivePeers() int {
  2722. return len(t.conns) + len(t.webSeeds)
  2723. }
  2724. func (t *Torrent) hasStorageCap() bool {
  2725. f := t.storage.Capacity
  2726. if f == nil {
  2727. return false
  2728. }
  2729. _, ok := (*f)()
  2730. return ok
  2731. }
  2732. func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
  2733. return pieceIndex(ri / t.chunksPerRegularPiece())
  2734. }
  2735. func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
  2736. reuseIter *typedRoaring.Iterator[RequestIndex],
  2737. piece pieceIndex,
  2738. f func(RequestIndex),
  2739. ) {
  2740. reuseIter.Initialize(&t.dirtyChunks)
  2741. pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
  2742. iterBitmapUnsetInRange(
  2743. reuseIter,
  2744. pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
  2745. f,
  2746. )
  2747. }
  2748. type requestState struct {
  2749. peer *Peer
  2750. when time.Time
  2751. }
  2752. // Returns an error if a received chunk is out of bounds in someway.
  2753. func (t *Torrent) checkValidReceiveChunk(r Request) error {
  2754. if !t.haveInfo() {
  2755. return errors.New("torrent missing info")
  2756. }
  2757. if int(r.Index) >= t.numPieces() {
  2758. return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
  2759. }
  2760. pieceLength := t.pieceLength(pieceIndex(r.Index))
  2761. if r.Begin >= pieceLength {
  2762. return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
  2763. }
  2764. // We could check chunk lengths here, but chunk request size is not changed often, and tricky
  2765. // for peers to manipulate as they need to send potentially large buffers to begin with. There
  2766. // should be considerable checks elsewhere for this case due to the network overhead. We should
  2767. // catch most of the overflow manipulation stuff by checking index and begin above.
  2768. return nil
  2769. }
  2770. func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
  2771. for pc := range t.conns {
  2772. dialAddr, err := pc.remoteDialAddrPort()
  2773. if err != nil {
  2774. continue
  2775. }
  2776. if dialAddr != target {
  2777. continue
  2778. }
  2779. ret = append(ret, pc)
  2780. }
  2781. return
  2782. }
  2783. func wrapUtHolepunchMsgForPeerConn(
  2784. recipient *PeerConn,
  2785. msg utHolepunch.Msg,
  2786. ) pp.Message {
  2787. extendedPayload, err := msg.MarshalBinary()
  2788. if err != nil {
  2789. panic(err)
  2790. }
  2791. return pp.Message{
  2792. Type: pp.Extended,
  2793. ExtendedID: MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName),
  2794. ExtendedPayload: extendedPayload,
  2795. }
  2796. }
  2797. func sendUtHolepunchMsg(
  2798. pc *PeerConn,
  2799. msgType utHolepunch.MsgType,
  2800. addrPort netip.AddrPort,
  2801. errCode utHolepunch.ErrCode,
  2802. ) {
  2803. holepunchMsg := utHolepunch.Msg{
  2804. MsgType: msgType,
  2805. AddrPort: addrPort,
  2806. ErrCode: errCode,
  2807. }
  2808. incHolepunchMessagesSent(holepunchMsg)
  2809. ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg)
  2810. pc.write(ppMsg)
  2811. }
  2812. func incHolepunchMessages(msg utHolepunch.Msg, verb string) {
  2813. torrent.Add(
  2814. fmt.Sprintf(
  2815. "holepunch %v %v messages %v",
  2816. msg.MsgType,
  2817. addrPortProtocolStr(msg.AddrPort),
  2818. verb,
  2819. ),
  2820. 1,
  2821. )
  2822. }
  2823. func incHolepunchMessagesReceived(msg utHolepunch.Msg) {
  2824. incHolepunchMessages(msg, "received")
  2825. }
  2826. func incHolepunchMessagesSent(msg utHolepunch.Msg) {
  2827. incHolepunchMessages(msg, "sent")
  2828. }
  2829. func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
  2830. incHolepunchMessagesReceived(msg)
  2831. switch msg.MsgType {
  2832. case utHolepunch.Rendezvous:
  2833. t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
  2834. sendMsg := sendUtHolepunchMsg
  2835. senderAddrPort, err := sender.remoteDialAddrPort()
  2836. if err != nil {
  2837. sender.logger.Levelf(
  2838. log.Warning,
  2839. "error getting ut_holepunch rendezvous sender's dial address: %v",
  2840. err,
  2841. )
  2842. // There's no better error code. The sender's address itself is invalid. I don't see
  2843. // this error message being appropriate anywhere else anyway.
  2844. sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
  2845. }
  2846. targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
  2847. if len(targets) == 0 {
  2848. sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
  2849. return nil
  2850. }
  2851. for _, pc := range targets {
  2852. if !pc.supportsExtension(utHolepunch.ExtensionName) {
  2853. sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport)
  2854. continue
  2855. }
  2856. sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
  2857. sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
  2858. }
  2859. return nil
  2860. case utHolepunch.Connect:
  2861. holepunchAddr := msg.AddrPort
  2862. t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender)
  2863. if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) {
  2864. setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr)
  2865. if g.MapContains(t.cl.accepted, holepunchAddr) {
  2866. setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
  2867. }
  2868. }
  2869. opts := outgoingConnOpts{
  2870. peerInfo: PeerInfo{
  2871. Addr: msg.AddrPort,
  2872. Source: PeerSourceUtHolepunch,
  2873. PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(),
  2874. },
  2875. t: t,
  2876. // Don't attempt to start our own rendezvous if we fail to connect.
  2877. skipHolepunchRendezvous: true,
  2878. receivedHolepunchConnect: true,
  2879. // Assume that the other end initiated the rendezvous, and will use our preferred
  2880. // encryption. So we will act normally.
  2881. HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
  2882. }
  2883. initiateConn(opts, true)
  2884. return nil
  2885. case utHolepunch.Error:
  2886. torrent.Add("holepunch error messages received", 1)
  2887. t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
  2888. return nil
  2889. default:
  2890. return fmt.Errorf("unhandled msg type %v", msg.MsgType)
  2891. }
  2892. }
  2893. func addrPortProtocolStr(addrPort netip.AddrPort) string {
  2894. addr := addrPort.Addr()
  2895. switch {
  2896. case addr.Is4():
  2897. return "ipv4"
  2898. case addr.Is6():
  2899. return "ipv6"
  2900. default:
  2901. panic(addrPort)
  2902. }
  2903. }
  2904. func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error {
  2905. rzsSent := 0
  2906. for pc := range t.conns {
  2907. if !pc.supportsExtension(utHolepunch.ExtensionName) {
  2908. continue
  2909. }
  2910. if pc.supportsExtension(pp.ExtensionNamePex) {
  2911. if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
  2912. continue
  2913. }
  2914. }
  2915. t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort)
  2916. sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
  2917. rzsSent++
  2918. }
  2919. if rzsSent == 0 {
  2920. return errors.New("no eligible relays")
  2921. }
  2922. return nil
  2923. }
  2924. func (t *Torrent) numHalfOpenAttempts() (num int) {
  2925. for _, attempts := range t.halfOpen {
  2926. num += len(attempts)
  2927. }
  2928. return
  2929. }
  2930. func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
  2931. cl := t.cl
  2932. cl.rLock()
  2933. defer cl.rUnlock()
  2934. return t.dialTimeout()
  2935. }
  2936. func (t *Torrent) canonicalShortInfohash() *infohash.T {
  2937. if t.infoHash.Ok {
  2938. return &t.infoHash.Value
  2939. }
  2940. return t.infoHashV2.UnwrapPtr().ToShort()
  2941. }
  2942. func (t *Torrent) eachShortInfohash(each func(short [20]byte)) {
  2943. if t.infoHash.Value == *t.infoHashV2.Value.ToShort() {
  2944. // This includes zero values, since they both should not be zero. Plus Option should not
  2945. // allow non-zero values for None.
  2946. panic("v1 and v2 info hashes should not be the same")
  2947. }
  2948. if t.infoHash.Ok {
  2949. each(t.infoHash.Value)
  2950. }
  2951. if t.infoHashV2.Ok {
  2952. v2Short := *t.infoHashV2.Value.ToShort()
  2953. each(v2Short)
  2954. }
  2955. }
  2956. func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File {
  2957. for _, f := range *t.files {
  2958. if f.piecesRoot.Unwrap() == hash {
  2959. return f
  2960. }
  2961. }
  2962. return nil
  2963. }
  2964. func (t *Torrent) pieceLayers() (pieceLayers map[string]string) {
  2965. if t.files == nil {
  2966. return
  2967. }
  2968. files := *t.files
  2969. g.MakeMapWithCap(&pieceLayers, len(files))
  2970. file:
  2971. for _, f := range files {
  2972. if !f.piecesRoot.Ok {
  2973. continue
  2974. }
  2975. key := f.piecesRoot.Value
  2976. var value strings.Builder
  2977. for i := f.BeginPieceIndex(); i < f.EndPieceIndex(); i++ {
  2978. hashOpt := t.piece(i).hashV2
  2979. if !hashOpt.Ok {
  2980. // All hashes must be present. This implementation should handle missing files, so
  2981. // move on to the next file.
  2982. continue file
  2983. }
  2984. value.Write(hashOpt.Value[:])
  2985. }
  2986. if value.Len() == 0 {
  2987. // Non-empty files are not recorded in piece layers.
  2988. continue
  2989. }
  2990. // If multiple files have the same root that shouldn't matter.
  2991. pieceLayers[string(key[:])] = value.String()
  2992. }
  2993. return
  2994. }
  2995. // Is On when all pieces are complete.
  2996. func (t *Torrent) Complete() chansync.ReadOnlyFlag {
  2997. return &t.complete
  2998. }