peerconn.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448
  1. package torrent
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math/rand"
  10. "net"
  11. "net/netip"
  12. "strconv"
  13. "strings"
  14. "sync/atomic"
  15. "time"
  16. "github.com/RoaringBitmap/roaring"
  17. "github.com/anacrolix/generics"
  18. . "github.com/anacrolix/generics"
  19. "github.com/anacrolix/log"
  20. "github.com/anacrolix/missinggo/v2/bitmap"
  21. "github.com/anacrolix/multiless"
  22. "golang.org/x/exp/maps"
  23. "golang.org/x/time/rate"
  24. "github.com/anacrolix/torrent/bencode"
  25. "github.com/anacrolix/torrent/internal/alloclim"
  26. "github.com/anacrolix/torrent/merkle"
  27. "github.com/anacrolix/torrent/metainfo"
  28. "github.com/anacrolix/torrent/mse"
  29. pp "github.com/anacrolix/torrent/peer_protocol"
  30. utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
  31. )
  32. // Maintains the state of a BitTorrent-protocol based connection with a peer.
  33. type PeerConn struct {
  34. Peer
  35. // Move to PeerConn?
  36. protocolLogger log.Logger
  37. // BEP 52
  38. v2 bool
  39. // A string that should identify the PeerConn's net.Conn endpoints. The net.Conn could
  40. // be wrapping WebRTC, uTP, or TCP etc. Used in writing the conn status for peers.
  41. connString string
  42. // See BEP 3 etc.
  43. PeerID PeerID
  44. PeerExtensionBytes pp.PeerExtensionBits
  45. PeerListenPort int
  46. // The local extended protocols to advertise in the extended handshake, and to support receiving
  47. // from the peer. This will point to the Client default when the PeerConnAdded callback is
  48. // invoked. Do not modify this, point it to your own instance. Do not modify the destination
  49. // after returning from the callback.
  50. LocalLtepProtocolMap *LocalLtepProtocolMap
  51. // The actual Conn, used for closing, and setting socket options. Do not use methods on this
  52. // while holding any mutexes.
  53. conn net.Conn
  54. // The Reader and Writer for this Conn, with hooks installed for stats,
  55. // limiting, deadlines etc.
  56. w io.Writer
  57. r io.Reader
  58. messageWriter peerConnMsgWriter
  59. // The peer's extension map, as sent in their extended handshake.
  60. PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
  61. PeerClientName atomic.Value
  62. uploadTimer *time.Timer
  63. pex pexConnState
  64. // The pieces the peer has claimed to have.
  65. _peerPieces roaring.Bitmap
  66. // The peer has everything. This can occur due to a special message, when
  67. // we may not even know the number of pieces in the torrent yet.
  68. peerSentHaveAll bool
  69. peerRequestDataAllocLimiter alloclim.Limiter
  70. outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
  71. // Hash requests sent to the peer. If there's an issue we probably don't want to reissue these,
  72. // because I haven't implemented it smart enough yet.
  73. sentHashRequests map[hashRequest]struct{}
  74. // Hash pieces received from the peer, mapped from pieces root to piece layer hashes. This way
  75. // we can verify all the pieces for a file when they're all arrived before submitting them to
  76. // the torrent.
  77. receivedHashPieces map[[32]byte][][32]byte
  78. }
  79. func (cn *PeerConn) pexStatus() string {
  80. if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) {
  81. return "extended protocol disabled"
  82. }
  83. if cn.PeerExtensionIDs == nil {
  84. return "pending extended handshake"
  85. }
  86. if !cn.supportsExtension(pp.ExtensionNamePex) {
  87. return "unsupported"
  88. }
  89. if true {
  90. return fmt.Sprintf(
  91. "%v conns, %v unsent events",
  92. len(cn.pex.remoteLiveConns),
  93. cn.pex.numPending(),
  94. )
  95. } else {
  96. // This alternative branch prints out the remote live conn addresses.
  97. return fmt.Sprintf(
  98. "%v conns, %v unsent events",
  99. strings.Join(generics.SliceMap(
  100. maps.Keys(cn.pex.remoteLiveConns),
  101. func(from netip.AddrPort) string {
  102. return from.String()
  103. }), ","),
  104. cn.pex.numPending(),
  105. )
  106. }
  107. }
  108. func (cn *PeerConn) peerImplStatusLines() []string {
  109. return []string{
  110. cn.connString,
  111. fmt.Sprintf("peer id: %+q", cn.PeerID),
  112. fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
  113. fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
  114. fmt.Sprintf("pex: %s", cn.pexStatus()),
  115. }
  116. }
  117. // Returns true if the connection is over IPv6.
  118. func (cn *PeerConn) ipv6() bool {
  119. ip := cn.remoteIp()
  120. if ip.To4() != nil {
  121. return false
  122. }
  123. return len(ip) == net.IPv6len
  124. }
  125. // Returns true the if the dialer/initiator has the higher client peer ID. See
  126. // https://github.com/arvidn/libtorrent/blame/272828e1cc37b042dfbbafa539222d8533e99755/src/bt_peer_connection.cpp#L3536-L3557.
  127. // As far as I can tell, Transmission just keeps the oldest connection.
  128. func (cn *PeerConn) isPreferredDirection() bool {
  129. // True if our client peer ID is higher than the remote's peer ID.
  130. return bytes.Compare(cn.PeerID[:], cn.t.cl.peerID[:]) < 0 == cn.outgoing
  131. }
  132. // Returns whether the left connection should be preferred over the right one,
  133. // considering only their networking properties. If ok is false, we can't
  134. // decide.
  135. func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) bool {
  136. var ml multiless.Computation
  137. ml = ml.Bool(r.isPreferredDirection(), l.isPreferredDirection())
  138. ml = ml.Bool(l.utp(), r.utp())
  139. ml = ml.Bool(r.ipv6(), l.ipv6())
  140. return ml.Less()
  141. }
  142. func (cn *PeerConn) peerHasAllPieces() (all, known bool) {
  143. if cn.peerSentHaveAll {
  144. return true, true
  145. }
  146. if !cn.t.haveInfo() {
  147. return false, false
  148. }
  149. return cn._peerPieces.GetCardinality() == uint64(cn.t.numPieces()), true
  150. }
  151. func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
  152. cn.setNumPieces(info.NumPieces())
  153. }
  154. // Correct the PeerPieces slice length. Return false if the existing slice is invalid, such as by
  155. // receiving badly sized BITFIELD, or invalid HAVE messages.
  156. func (cn *PeerConn) setNumPieces(num pieceIndex) {
  157. cn._peerPieces.RemoveRange(bitmap.BitRange(num), bitmap.ToEnd)
  158. cn.peerPiecesChanged()
  159. }
  160. func (cn *PeerConn) peerPieces() *roaring.Bitmap {
  161. return &cn._peerPieces
  162. }
  163. func (cn *PeerConn) connectionFlags() string {
  164. var sb strings.Builder
  165. add := func(s string) {
  166. if sb.Len() > 0 {
  167. sb.WriteByte(',')
  168. }
  169. sb.WriteString(s)
  170. }
  171. // From first relevant to last.
  172. add(string(cn.Discovery))
  173. if cn.utp() {
  174. add("U")
  175. }
  176. if cn.cryptoMethod == mse.CryptoMethodRC4 {
  177. add("E")
  178. } else if cn.headerEncrypted {
  179. add("e")
  180. }
  181. if cn.v2 {
  182. add("v2")
  183. } else {
  184. add("v1")
  185. }
  186. return sb.String()
  187. }
  188. func (cn *PeerConn) utp() bool {
  189. return parseNetworkString(cn.Network).Udp
  190. }
  191. func (cn *PeerConn) onClose() {
  192. if cn.pex.IsEnabled() {
  193. cn.pex.Close()
  194. }
  195. cn.tickleWriter()
  196. if cn.conn != nil {
  197. go cn.conn.Close()
  198. }
  199. if cb := cn.callbacks.PeerConnClosed; cb != nil {
  200. cb(cn)
  201. }
  202. }
  203. // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
  204. // done asynchronously, so it may be that we're not able to honour backpressure from this method.
  205. func (cn *PeerConn) write(msg pp.Message) bool {
  206. torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
  207. // We don't need to track bytes here because the connection's Writer has that behaviour injected
  208. // (although there's some delay between us buffering the message, and the connection writer
  209. // flushing it out.).
  210. notFull := cn.messageWriter.write(msg)
  211. // Last I checked only Piece messages affect stats, and we don't write those.
  212. cn.wroteMsg(&msg)
  213. cn.tickleWriter()
  214. return notFull
  215. }
  216. func (cn *PeerConn) requestMetadataPiece(index int) {
  217. eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
  218. if eID == pp.ExtensionDeleteNumber {
  219. return
  220. }
  221. if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
  222. return
  223. }
  224. cn.protocolLogger.WithDefaultLevel(log.Debug).Printf("requesting metadata piece %d", index)
  225. cn.write(pp.MetadataExtensionRequestMsg(eID, index))
  226. for index >= len(cn.metadataRequests) {
  227. cn.metadataRequests = append(cn.metadataRequests, false)
  228. }
  229. cn.metadataRequests[index] = true
  230. }
  231. func (cn *PeerConn) requestedMetadataPiece(index int) bool {
  232. return index < len(cn.metadataRequests) && cn.metadataRequests[index]
  233. }
  234. func (cn *PeerConn) onPeerSentCancel(r Request) {
  235. if _, ok := cn.peerRequests[r]; !ok {
  236. torrent.Add("unexpected cancels received", 1)
  237. return
  238. }
  239. if cn.fastEnabled() {
  240. cn.reject(r)
  241. } else {
  242. delete(cn.peerRequests, r)
  243. }
  244. }
  245. func (cn *PeerConn) choke(msg messageWriter) (more bool) {
  246. if cn.choking {
  247. return true
  248. }
  249. cn.choking = true
  250. more = msg(pp.Message{
  251. Type: pp.Choke,
  252. })
  253. if !cn.fastEnabled() {
  254. cn.deleteAllPeerRequests()
  255. }
  256. return
  257. }
  258. func (cn *PeerConn) deleteAllPeerRequests() {
  259. for _, state := range cn.peerRequests {
  260. state.allocReservation.Drop()
  261. }
  262. cn.peerRequests = nil
  263. }
  264. func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
  265. if !cn.choking {
  266. return true
  267. }
  268. cn.choking = false
  269. return msg(pp.Message{
  270. Type: pp.Unchoke,
  271. })
  272. }
  273. func (pc *PeerConn) writeInterested(interested bool) bool {
  274. return pc.write(pp.Message{
  275. Type: func() pp.MessageType {
  276. if interested {
  277. return pp.Interested
  278. } else {
  279. return pp.NotInterested
  280. }
  281. }(),
  282. })
  283. }
  284. func (me *PeerConn) _request(r Request) bool {
  285. return me.write(pp.Message{
  286. Type: pp.Request,
  287. Index: r.Index,
  288. Begin: r.Begin,
  289. Length: r.Length,
  290. })
  291. }
  292. func (me *PeerConn) _cancel(r RequestIndex) bool {
  293. me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
  294. return me.remoteRejectsCancels()
  295. }
  296. // Whether we should expect a reject message after sending a cancel.
  297. func (me *PeerConn) remoteRejectsCancels() bool {
  298. if !me.fastEnabled() {
  299. return false
  300. }
  301. if me.remoteIsTransmission() {
  302. // Transmission did not send rejects for received cancels. See
  303. // https://github.com/transmission/transmission/pull/2275. Fixed in 4.0.0-beta.1 onward in
  304. // https://github.com/transmission/transmission/commit/76719bf34c255da4fca991c2ad3fa4b65d2154b1.
  305. // Peer ID prefix scheme described
  306. // https://github.com/transmission/transmission/blob/7ec7607bbcf0fa99bd4b157b9b0f0c411d59f45d/CMakeLists.txt#L128-L149.
  307. return me.PeerID[3] >= '4'
  308. }
  309. return true
  310. }
  311. func (cn *PeerConn) fillWriteBuffer() {
  312. if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen {
  313. // Fully committing to our max requests requires sufficient space (see
  314. // maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make
  315. // requests than to do PEX or upload, so we short-circuit before handling those. Any update
  316. // request reason will not be cleared, so we'll come right back here when there's space. We
  317. // can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no
  318. // knowledge of write buffers.
  319. return
  320. }
  321. cn.requestMissingHashes()
  322. cn.maybeUpdateActualRequestState()
  323. if cn.pex.IsEnabled() {
  324. if flow := cn.pex.Share(cn.write); !flow {
  325. return
  326. }
  327. }
  328. cn.upload(cn.write)
  329. }
  330. func (cn *PeerConn) have(piece pieceIndex) {
  331. if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
  332. return
  333. }
  334. cn.write(pp.Message{
  335. Type: pp.Have,
  336. Index: pp.Integer(piece),
  337. })
  338. cn.sentHaves.Add(bitmap.BitIndex(piece))
  339. }
  340. func (cn *PeerConn) postBitfield() {
  341. if cn.sentHaves.Len() != 0 {
  342. panic("bitfield must be first have-related message sent")
  343. }
  344. if !cn.t.haveAnyPieces() {
  345. return
  346. }
  347. cn.write(pp.Message{
  348. Type: pp.Bitfield,
  349. Bitfield: cn.t.bitfield(),
  350. })
  351. cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
  352. }
  353. func (cn *PeerConn) handleUpdateRequests() {
  354. // The writer determines the request state as needed when it can write.
  355. cn.tickleWriter()
  356. }
  357. func (cn *PeerConn) raisePeerMinPieces(newMin pieceIndex) {
  358. if newMin > cn.peerMinPieces {
  359. cn.peerMinPieces = newMin
  360. }
  361. }
  362. func (cn *PeerConn) peerSentHave(piece pieceIndex) error {
  363. if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
  364. return errors.New("invalid piece")
  365. }
  366. if cn.peerHasPiece(piece) {
  367. return nil
  368. }
  369. cn.raisePeerMinPieces(piece + 1)
  370. if !cn.peerHasPiece(piece) {
  371. cn.t.incPieceAvailability(piece)
  372. }
  373. cn._peerPieces.Add(uint32(piece))
  374. if cn.t.wantPieceIndex(piece) {
  375. cn.updateRequests("have")
  376. }
  377. cn.peerPiecesChanged()
  378. return nil
  379. }
  380. func (cn *PeerConn) peerSentBitfield(bf []bool) error {
  381. if len(bf)%8 != 0 {
  382. panic("expected bitfield length divisible by 8")
  383. }
  384. // We know that the last byte means that at most the last 7 bits are wasted.
  385. cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
  386. if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
  387. // Ignore known excess pieces.
  388. bf = bf[:cn.t.numPieces()]
  389. }
  390. bm := boolSliceToBitmap(bf)
  391. if cn.t.haveInfo() && pieceIndex(bm.GetCardinality()) == cn.t.numPieces() {
  392. cn.onPeerHasAllPieces()
  393. return nil
  394. }
  395. if !bm.IsEmpty() {
  396. cn.raisePeerMinPieces(pieceIndex(bm.Maximum()) + 1)
  397. }
  398. shouldUpdateRequests := false
  399. if cn.peerSentHaveAll {
  400. if !cn.t.deleteConnWithAllPieces(&cn.Peer) {
  401. panic(cn)
  402. }
  403. cn.peerSentHaveAll = false
  404. if !cn._peerPieces.IsEmpty() {
  405. panic("if peer has all, we expect no individual peer pieces to be set")
  406. }
  407. } else {
  408. bm.Xor(&cn._peerPieces)
  409. }
  410. cn.peerSentHaveAll = false
  411. // bm is now 'on' for pieces that are changing
  412. bm.Iterate(func(x uint32) bool {
  413. pi := pieceIndex(x)
  414. if cn._peerPieces.Contains(x) {
  415. // Then we must be losing this piece
  416. cn.t.decPieceAvailability(pi)
  417. } else {
  418. if !shouldUpdateRequests && cn.t.wantPieceIndex(pieceIndex(x)) {
  419. shouldUpdateRequests = true
  420. }
  421. // We must be gaining this piece
  422. cn.t.incPieceAvailability(pieceIndex(x))
  423. }
  424. return true
  425. })
  426. // Apply the changes. If we had everything previously, this should be empty, so xor is the same
  427. // as or.
  428. cn._peerPieces.Xor(&bm)
  429. if shouldUpdateRequests {
  430. cn.updateRequests("bitfield")
  431. }
  432. // We didn't guard this before, I see no reason to do it now.
  433. cn.peerPiecesChanged()
  434. return nil
  435. }
  436. func (cn *PeerConn) onPeerHasAllPiecesNoTriggers() {
  437. t := cn.t
  438. if t.haveInfo() {
  439. cn._peerPieces.Iterate(func(x uint32) bool {
  440. t.decPieceAvailability(pieceIndex(x))
  441. return true
  442. })
  443. }
  444. t.addConnWithAllPieces(&cn.Peer)
  445. cn.peerSentHaveAll = true
  446. cn._peerPieces.Clear()
  447. }
  448. func (cn *PeerConn) onPeerHasAllPieces() {
  449. cn.onPeerHasAllPiecesNoTriggers()
  450. cn.peerHasAllPiecesTriggers()
  451. }
  452. func (cn *PeerConn) peerHasAllPiecesTriggers() {
  453. if !cn.t._pendingPieces.IsEmpty() {
  454. cn.updateRequests("Peer.onPeerHasAllPieces")
  455. }
  456. cn.peerPiecesChanged()
  457. }
  458. func (cn *PeerConn) onPeerSentHaveAll() error {
  459. cn.onPeerHasAllPieces()
  460. return nil
  461. }
  462. func (cn *PeerConn) peerSentHaveNone() error {
  463. if !cn.peerSentHaveAll {
  464. cn.t.decPeerPieceAvailability(&cn.Peer)
  465. }
  466. cn._peerPieces.Clear()
  467. cn.peerSentHaveAll = false
  468. cn.peerPiecesChanged()
  469. return nil
  470. }
  471. func (c *PeerConn) requestPendingMetadata() {
  472. if c.t.haveInfo() {
  473. return
  474. }
  475. if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
  476. // Peer doesn't support this.
  477. return
  478. }
  479. // Request metadata pieces that we don't have in a random order.
  480. var pending []int
  481. for index := 0; index < c.t.metadataPieceCount(); index++ {
  482. if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
  483. pending = append(pending, index)
  484. }
  485. }
  486. rand.Shuffle(len(pending), func(i, j int) { pending[i], pending[j] = pending[j], pending[i] })
  487. for _, i := range pending {
  488. c.requestMetadataPiece(i)
  489. }
  490. }
  491. func (cn *PeerConn) wroteMsg(msg *pp.Message) {
  492. torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
  493. if msg.Type == pp.Extended {
  494. for name, id := range cn.PeerExtensionIDs {
  495. if id != msg.ExtendedID {
  496. continue
  497. }
  498. torrent.Add(fmt.Sprintf("Extended messages written for protocol %q", name), 1)
  499. }
  500. }
  501. cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
  502. }
  503. func (cn *PeerConn) wroteBytes(n int64) {
  504. cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
  505. }
  506. func (c *PeerConn) fastEnabled() bool {
  507. return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast()
  508. }
  509. func (c *PeerConn) reject(r Request) {
  510. if !c.fastEnabled() {
  511. panic("fast not enabled")
  512. }
  513. c.write(r.ToMsg(pp.Reject))
  514. // It is possible to reject a request before it is added to peer requests due to being invalid.
  515. if state, ok := c.peerRequests[r]; ok {
  516. state.allocReservation.Drop()
  517. delete(c.peerRequests, r)
  518. }
  519. }
  520. func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
  521. uploadRateLimiter := c.t.cl.config.UploadRateLimiter
  522. if uploadRateLimiter.Limit() == rate.Inf {
  523. return
  524. }
  525. return Some(uploadRateLimiter.Burst())
  526. }
  527. // startFetch is for testing purposes currently.
  528. func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
  529. requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
  530. if _, ok := c.peerRequests[r]; ok {
  531. torrent.Add("duplicate requests received", 1)
  532. if c.fastEnabled() {
  533. return errors.New("received duplicate request with fast enabled")
  534. }
  535. return nil
  536. }
  537. if c.choking {
  538. torrent.Add("requests received while choking", 1)
  539. if c.fastEnabled() {
  540. torrent.Add("requests rejected while choking", 1)
  541. c.reject(r)
  542. }
  543. return nil
  544. }
  545. // TODO: What if they've already requested this?
  546. if len(c.peerRequests) >= localClientReqq {
  547. torrent.Add("requests received while queue full", 1)
  548. if c.fastEnabled() {
  549. c.reject(r)
  550. }
  551. // BEP 6 says we may close here if we choose.
  552. return nil
  553. }
  554. if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
  555. err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
  556. c.protocolLogger.Levelf(log.Warning, err.Error())
  557. if c.fastEnabled() {
  558. c.reject(r)
  559. return nil
  560. } else {
  561. return err
  562. }
  563. }
  564. if !c.t.havePiece(pieceIndex(r.Index)) {
  565. // TODO: Tell the peer we don't have the piece, and reject this request.
  566. requestsReceivedForMissingPieces.Add(1)
  567. return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
  568. }
  569. pieceLength := c.t.pieceLength(pieceIndex(r.Index))
  570. // Check this after we know we have the piece, so that the piece length will be known.
  571. if chunkOverflowsPiece(r.ChunkSpec, pieceLength) {
  572. torrent.Add("bad requests received", 1)
  573. return errors.New("chunk overflows piece")
  574. }
  575. if c.peerRequests == nil {
  576. c.peerRequests = make(map[Request]*peerRequestState, localClientReqq)
  577. }
  578. value := &peerRequestState{
  579. allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)),
  580. }
  581. c.peerRequests[r] = value
  582. if startFetch {
  583. // TODO: Limit peer request data read concurrency.
  584. go c.peerRequestDataReader(r, value)
  585. }
  586. return nil
  587. }
  588. func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
  589. // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
  590. // or fail to read and then cleanup. Also, we used to hang here if the reservation was never
  591. // dropped, that was fixed.
  592. ctx := context.Background()
  593. err := prs.allocReservation.Wait(ctx)
  594. if err != nil {
  595. c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err)
  596. return
  597. }
  598. b, err := c.readPeerRequestData(r)
  599. c.locker().Lock()
  600. defer c.locker().Unlock()
  601. if err != nil {
  602. c.peerRequestDataReadFailed(err, r)
  603. } else {
  604. if b == nil {
  605. panic("data must be non-nil to trigger send")
  606. }
  607. torrent.Add("peer request data read successes", 1)
  608. prs.data = b
  609. // This might be required for the error case too (#752 and #753).
  610. c.tickleWriter()
  611. }
  612. }
  613. // If this is maintained correctly, we might be able to support optional synchronous reading for
  614. // chunk sending, the way it used to work.
  615. func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
  616. torrent.Add("peer request data read failures", 1)
  617. logLevel := log.Warning
  618. if c.t.hasStorageCap() {
  619. // It's expected that pieces might drop. See
  620. // https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313.
  621. logLevel = log.Debug
  622. }
  623. c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err)
  624. if c.t.closed.IsSet() {
  625. return
  626. }
  627. i := pieceIndex(r.Index)
  628. if c.t.pieceComplete(i) {
  629. // There used to be more code here that just duplicated the following break. Piece
  630. // completions are currently cached, so I'm not sure how helpful this update is, except to
  631. // pull any completion changes pushed to the storage backend in failed reads that got us
  632. // here.
  633. c.t.updatePieceCompletion(i)
  634. }
  635. // We've probably dropped a piece from storage, but there's no way to communicate this to the
  636. // peer. If they ask for it again, we kick them allowing us to send them updated piece states if
  637. // we reconnect. TODO: Instead, we could just try to update them with Bitfield or HaveNone and
  638. // if they kick us for breaking protocol, on reconnect we will be compliant again (at least
  639. // initially).
  640. if c.fastEnabled() {
  641. c.reject(r)
  642. } else {
  643. if c.choking {
  644. // If fast isn't enabled, I think we would have wiped all peer requests when we last
  645. // choked, and requests while we're choking would be ignored. It could be possible that
  646. // a peer request data read completed concurrently to it being deleted elsewhere.
  647. c.protocolLogger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
  648. }
  649. // Choking a non-fast peer should cause them to flush all their requests.
  650. c.choke(c.write)
  651. }
  652. }
  653. func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) {
  654. b := make([]byte, r.Length)
  655. p := c.t.info.Piece(int(r.Index))
  656. n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
  657. if n == len(b) {
  658. if err == io.EOF {
  659. err = nil
  660. }
  661. } else {
  662. if err == nil {
  663. panic("expected error")
  664. }
  665. }
  666. return b, err
  667. }
  668. func (c *PeerConn) logProtocolBehaviour(level log.Level, format string, arg ...interface{}) {
  669. c.protocolLogger.WithContextText(fmt.Sprintf(
  670. "peer id %q, ext v %q", c.PeerID, c.PeerClientName.Load(),
  671. )).SkipCallers(1).Levelf(level, format, arg...)
  672. }
  673. // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
  674. // exit. Returning will end the connection.
  675. func (c *PeerConn) mainReadLoop() (err error) {
  676. defer func() {
  677. if err != nil {
  678. torrent.Add("connection.mainReadLoop returned with error", 1)
  679. } else {
  680. torrent.Add("connection.mainReadLoop returned with no error", 1)
  681. }
  682. }()
  683. t := c.t
  684. cl := t.cl
  685. decoder := pp.Decoder{
  686. R: bufio.NewReaderSize(c.r, 1<<17),
  687. MaxLength: 4 * pp.Integer(max(int64(t.chunkSize), defaultChunkSize)),
  688. Pool: &t.chunkPool,
  689. }
  690. for {
  691. var msg pp.Message
  692. func() {
  693. cl.unlock()
  694. defer cl.lock()
  695. err = decoder.Decode(&msg)
  696. if err != nil {
  697. err = fmt.Errorf("decoding message: %w", err)
  698. }
  699. }()
  700. // Do this before checking closed.
  701. if cb := c.callbacks.ReadMessage; cb != nil && err == nil {
  702. cb(c, &msg)
  703. }
  704. if t.closed.IsSet() || c.closed.IsSet() {
  705. return nil
  706. }
  707. if err != nil {
  708. err = log.WithLevel(log.Info, err)
  709. return err
  710. }
  711. c.lastMessageReceived = time.Now()
  712. if msg.Keepalive {
  713. receivedKeepalives.Add(1)
  714. continue
  715. }
  716. messageTypesReceived.Add(msg.Type.String(), 1)
  717. if msg.Type.FastExtension() && !c.fastEnabled() {
  718. runSafeExtraneous(func() { torrent.Add("fast messages received when extension is disabled", 1) })
  719. return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
  720. }
  721. switch msg.Type {
  722. case pp.Choke:
  723. if c.peerChoking {
  724. break
  725. }
  726. if !c.fastEnabled() {
  727. c.deleteAllRequests("choked by non-fast PeerConn")
  728. } else {
  729. // We don't decrement pending requests here, let's wait for the peer to either
  730. // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
  731. // us and resume where they left off, we don't want to have piled on to those chunks
  732. // in the meanwhile. I think a peer's ability to abuse this should be limited: they
  733. // could let us request a lot of stuff, then choke us and never reject, but they're
  734. // only a single peer, our chunk balancing should smooth over this abuse.
  735. }
  736. c.peerChoking = true
  737. c.updateExpectingChunks()
  738. case pp.Unchoke:
  739. if !c.peerChoking {
  740. // Some clients do this for some reason. Transmission doesn't error on this, so we
  741. // won't for consistency.
  742. c.logProtocolBehaviour(log.Debug, "received unchoke when already unchoked")
  743. break
  744. }
  745. c.peerChoking = false
  746. preservedCount := 0
  747. c.requestState.Requests.Iterate(func(x RequestIndex) bool {
  748. if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) {
  749. preservedCount++
  750. }
  751. return true
  752. })
  753. if preservedCount != 0 {
  754. // TODO: Yes this is a debug log but I'm not happy with the state of the logging lib
  755. // right now.
  756. c.protocolLogger.Levelf(log.Debug,
  757. "%v requests were preserved while being choked (fast=%v)",
  758. preservedCount,
  759. c.fastEnabled())
  760. torrent.Add("requestsPreservedThroughChoking", int64(preservedCount))
  761. }
  762. if !c.t._pendingPieces.IsEmpty() {
  763. c.updateRequests("unchoked")
  764. }
  765. c.updateExpectingChunks()
  766. case pp.Interested:
  767. c.peerInterested = true
  768. c.tickleWriter()
  769. case pp.NotInterested:
  770. c.peerInterested = false
  771. // We don't clear their requests since it isn't clear in the spec.
  772. // We'll probably choke them for this, which will clear them if
  773. // appropriate, and is clearly specified.
  774. case pp.Have:
  775. err = c.peerSentHave(pieceIndex(msg.Index))
  776. case pp.Bitfield:
  777. err = c.peerSentBitfield(msg.Bitfield)
  778. case pp.Request:
  779. r := newRequestFromMessage(&msg)
  780. err = c.onReadRequest(r, true)
  781. if err != nil {
  782. err = fmt.Errorf("on reading request %v: %w", r, err)
  783. }
  784. case pp.Piece:
  785. c.doChunkReadStats(int64(len(msg.Piece)))
  786. err = c.receiveChunk(&msg)
  787. if len(msg.Piece) == int(t.chunkSize) {
  788. t.chunkPool.Put(&msg.Piece)
  789. }
  790. if err != nil {
  791. err = fmt.Errorf("receiving chunk: %w", err)
  792. }
  793. case pp.Cancel:
  794. req := newRequestFromMessage(&msg)
  795. c.onPeerSentCancel(req)
  796. case pp.Port:
  797. ipa, ok := tryIpPortFromNetAddr(c.RemoteAddr)
  798. if !ok {
  799. break
  800. }
  801. pingAddr := net.UDPAddr{
  802. IP: ipa.IP,
  803. Port: ipa.Port,
  804. }
  805. if msg.Port != 0 {
  806. pingAddr.Port = int(msg.Port)
  807. }
  808. cl.eachDhtServer(func(s DhtServer) {
  809. go s.Ping(&pingAddr)
  810. })
  811. case pp.Suggest:
  812. torrent.Add("suggests received", 1)
  813. log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).LogLevel(log.Debug, c.t.logger)
  814. c.updateRequests("suggested")
  815. case pp.HaveAll:
  816. err = c.onPeerSentHaveAll()
  817. case pp.HaveNone:
  818. err = c.peerSentHaveNone()
  819. case pp.Reject:
  820. req := newRequestFromMessage(&msg)
  821. if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
  822. err = fmt.Errorf("received invalid reject for request %v", req)
  823. c.protocolLogger.Levelf(log.Debug, "%v", err)
  824. }
  825. case pp.AllowedFast:
  826. torrent.Add("allowed fasts received", 1)
  827. log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger)
  828. c.updateRequests("PeerConn.mainReadLoop allowed fast")
  829. case pp.Extended:
  830. err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
  831. case pp.Hashes:
  832. err = c.onReadHashes(&msg)
  833. case pp.HashRequest:
  834. err = c.onHashRequest(&msg)
  835. case pp.HashReject:
  836. c.protocolLogger.Levelf(log.Info, "received unimplemented BitTorrent v2 message: %v", msg.Type)
  837. default:
  838. err = fmt.Errorf("received unknown message type: %#v", msg.Type)
  839. }
  840. if err != nil {
  841. return err
  842. }
  843. }
  844. }
  845. func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
  846. defer func() {
  847. // TODO: Should we still do this?
  848. if err != nil {
  849. // These clients use their own extension IDs for outgoing message
  850. // types, which is incorrect.
  851. if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
  852. err = nil
  853. }
  854. }
  855. }()
  856. t := c.t
  857. cl := t.cl
  858. {
  859. event := PeerConnReadExtensionMessageEvent{
  860. PeerConn: c,
  861. ExtensionNumber: id,
  862. Payload: payload,
  863. }
  864. for _, cb := range c.callbacks.PeerConnReadExtensionMessage {
  865. cb(event)
  866. }
  867. }
  868. if id == pp.HandshakeExtendedID {
  869. var d pp.ExtendedHandshakeMessage
  870. if err := bencode.Unmarshal(payload, &d); err != nil {
  871. c.protocolLogger.Printf("error parsing extended handshake message %q: %s", payload, err)
  872. return fmt.Errorf("unmarshalling extended handshake payload: %w", err)
  873. }
  874. // Trigger this callback after it's been processed. If you want to handle it yourself, you
  875. // should hook PeerConnReadExtensionMessage.
  876. if cb := c.callbacks.ReadExtendedHandshake; cb != nil {
  877. cb(c, &d)
  878. }
  879. if d.Reqq != 0 {
  880. c.PeerMaxRequests = d.Reqq
  881. }
  882. c.PeerClientName.Store(d.V)
  883. if c.PeerExtensionIDs == nil {
  884. c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
  885. }
  886. c.PeerListenPort = d.Port
  887. c.PeerPrefersEncryption = d.Encryption
  888. for name, id := range d.M {
  889. if _, ok := c.PeerExtensionIDs[name]; !ok {
  890. peersSupportingExtension.Add(
  891. // expvar.Var.String must produce valid JSON. "ut_payme\xeet_address" was being
  892. // entered here which caused problems later when unmarshalling.
  893. strconv.Quote(string(name)),
  894. 1)
  895. }
  896. c.PeerExtensionIDs[name] = id
  897. }
  898. if d.MetadataSize != 0 {
  899. if err = t.setMetadataSize(d.MetadataSize); err != nil {
  900. return fmt.Errorf("setting metadata size to %d: %w", d.MetadataSize, err)
  901. }
  902. }
  903. c.requestPendingMetadata()
  904. if !t.cl.config.DisablePEX {
  905. t.pex.Add(c) // we learnt enough now
  906. // This checks the extension is supported internally.
  907. c.pex.Init(c)
  908. }
  909. return nil
  910. }
  911. extensionName, builtin, err := c.LocalLtepProtocolMap.LookupId(id)
  912. if err != nil {
  913. return
  914. }
  915. if !builtin {
  916. // User should have taken care of this in PeerConnReadExtensionMessage callback.
  917. return nil
  918. }
  919. switch extensionName {
  920. case pp.ExtensionNameMetadata:
  921. err := cl.gotMetadataExtensionMsg(payload, t, c)
  922. if err != nil {
  923. return fmt.Errorf("handling metadata extension message: %w", err)
  924. }
  925. return nil
  926. case pp.ExtensionNamePex:
  927. if !c.pex.IsEnabled() {
  928. return nil // or hang-up maybe?
  929. }
  930. err = c.pex.Recv(payload)
  931. if err != nil {
  932. err = fmt.Errorf("receiving pex message: %w", err)
  933. }
  934. return
  935. case utHolepunch.ExtensionName:
  936. var msg utHolepunch.Msg
  937. err = msg.UnmarshalBinary(payload)
  938. if err != nil {
  939. err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err)
  940. return
  941. }
  942. err = c.t.handleReceivedUtHolepunchMsg(msg, c)
  943. return
  944. default:
  945. panic(fmt.Sprintf("unhandled builtin extension protocol %q", extensionName))
  946. }
  947. }
  948. // Set both the Reader and Writer for the connection from a single ReadWriter.
  949. func (cn *PeerConn) setRW(rw io.ReadWriter) {
  950. cn.r = rw
  951. cn.w = rw
  952. }
  953. // Returns the Reader and Writer as a combined ReadWriter.
  954. func (cn *PeerConn) rw() io.ReadWriter {
  955. return struct {
  956. io.Reader
  957. io.Writer
  958. }{cn.r, cn.w}
  959. }
  960. func (c *PeerConn) uploadAllowed() bool {
  961. if c.t.cl.config.NoUpload {
  962. return false
  963. }
  964. if c.t.dataUploadDisallowed {
  965. return false
  966. }
  967. if c.t.seeding() {
  968. return true
  969. }
  970. if !c.peerHasWantedPieces() {
  971. return false
  972. }
  973. // Don't upload more than 100 KiB more than we download.
  974. if c._stats.BytesWrittenData.Int64() >= c._stats.BytesReadData.Int64()+100<<10 {
  975. return false
  976. }
  977. return true
  978. }
  979. func (c *PeerConn) setRetryUploadTimer(delay time.Duration) {
  980. if c.uploadTimer == nil {
  981. c.uploadTimer = time.AfterFunc(delay, c.tickleWriter)
  982. } else {
  983. c.uploadTimer.Reset(delay)
  984. }
  985. }
  986. // Also handles choking and unchoking of the remote peer.
  987. func (c *PeerConn) upload(msg func(pp.Message) bool) bool {
  988. // Breaking or completing this loop means we don't want to upload to the peer anymore, and we
  989. // choke them.
  990. another:
  991. for c.uploadAllowed() {
  992. // We want to upload to the peer.
  993. if !c.unchoke(msg) {
  994. return false
  995. }
  996. for r, state := range c.peerRequests {
  997. if state.data == nil {
  998. continue
  999. }
  1000. res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
  1001. if !res.OK() {
  1002. panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
  1003. }
  1004. delay := res.Delay()
  1005. if delay > 0 {
  1006. res.Cancel()
  1007. c.setRetryUploadTimer(delay)
  1008. // Hard to say what to return here.
  1009. return true
  1010. }
  1011. more := c.sendChunk(r, msg, state)
  1012. delete(c.peerRequests, r)
  1013. if !more {
  1014. return false
  1015. }
  1016. goto another
  1017. }
  1018. return true
  1019. }
  1020. return c.choke(msg)
  1021. }
  1022. func (cn *PeerConn) drop() {
  1023. cn.t.dropConnection(cn)
  1024. }
  1025. func (cn *PeerConn) ban() {
  1026. cn.t.cl.banPeerIP(cn.remoteIp())
  1027. }
  1028. // This is called when something has changed that should wake the writer, such as putting stuff into
  1029. // the writeBuffer, or changing some state that the writer can act on.
  1030. func (c *PeerConn) tickleWriter() {
  1031. c.messageWriter.writeCond.Broadcast()
  1032. }
  1033. func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
  1034. c.lastChunkSent = time.Now()
  1035. state.allocReservation.Release()
  1036. return msg(pp.Message{
  1037. Type: pp.Piece,
  1038. Index: r.Index,
  1039. Begin: r.Begin,
  1040. Piece: state.data,
  1041. })
  1042. }
  1043. func (c *Peer) setTorrent(t *Torrent) {
  1044. if c.t != nil {
  1045. panic("connection already associated with a torrent")
  1046. }
  1047. c.t = t
  1048. c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t)
  1049. t.reconcileHandshakeStats(c)
  1050. }
  1051. func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
  1052. f := pp.PexPeerFlags(0)
  1053. if c.PeerPrefersEncryption {
  1054. f |= pp.PexPrefersEncryption
  1055. }
  1056. if c.outgoing {
  1057. f |= pp.PexOutgoingConn
  1058. }
  1059. if c.utp() {
  1060. f |= pp.PexSupportsUtp
  1061. }
  1062. return f
  1063. }
  1064. // This returns the address to use if we want to dial the peer again. It incorporates the peer's
  1065. // advertised listen port.
  1066. func (c *PeerConn) dialAddr() PeerRemoteAddr {
  1067. if c.outgoing || c.PeerListenPort == 0 {
  1068. return c.RemoteAddr
  1069. }
  1070. addrPort, err := addrPortFromPeerRemoteAddr(c.RemoteAddr)
  1071. if err != nil {
  1072. c.logger.Levelf(
  1073. log.Warning,
  1074. "error parsing %q for alternate dial port: %v",
  1075. c.RemoteAddr,
  1076. err,
  1077. )
  1078. return c.RemoteAddr
  1079. }
  1080. return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort))
  1081. }
  1082. func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) {
  1083. f := c.pexPeerFlags()
  1084. dialAddr := c.dialAddr()
  1085. addr, err := addrPortFromPeerRemoteAddr(dialAddr)
  1086. if err != nil || !addr.IsValid() {
  1087. err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err)
  1088. return
  1089. }
  1090. return pexEvent{t, addr, f, nil}, nil
  1091. }
  1092. func (pc *PeerConn) String() string {
  1093. return fmt.Sprintf(
  1094. "%T %p [flags=%v id=%+q, exts=%v, v=%q]",
  1095. pc,
  1096. pc,
  1097. pc.connectionFlags(),
  1098. pc.PeerID,
  1099. pc.PeerExtensionBytes,
  1100. pc.PeerClientName.Load(),
  1101. )
  1102. }
  1103. // Returns the pieces the peer could have based on their claims. If we don't know how many pieces
  1104. // are in the torrent, it could be a very large range if the peer has sent HaveAll.
  1105. func (pc *PeerConn) PeerPieces() *roaring.Bitmap {
  1106. pc.locker().RLock()
  1107. defer pc.locker().RUnlock()
  1108. return pc.newPeerPieces()
  1109. }
  1110. func (pc *PeerConn) remoteIsTransmission() bool {
  1111. return bytes.HasPrefix(pc.PeerID[:], []byte("-TR")) && pc.PeerID[7] == '-'
  1112. }
  1113. func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) {
  1114. dialAddr := pc.dialAddr()
  1115. return addrPortFromPeerRemoteAddr(dialAddr)
  1116. }
  1117. func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
  1118. return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
  1119. }
  1120. func (cn *PeerConn) peerPiecesChanged() {
  1121. cn.t.maybeDropMutuallyCompletePeer(cn)
  1122. }
  1123. // Returns whether the connection could be useful to us. We're seeding and
  1124. // they want data, we don't have metainfo and they can provide it, etc.
  1125. func (c *PeerConn) useful() bool {
  1126. t := c.t
  1127. if c.closed.IsSet() {
  1128. return false
  1129. }
  1130. if !t.haveInfo() {
  1131. return c.supportsExtension("ut_metadata")
  1132. }
  1133. if t.seeding() && c.peerInterested {
  1134. return true
  1135. }
  1136. if c.peerHasWantedPieces() {
  1137. return true
  1138. }
  1139. return false
  1140. }
  1141. func makeBuiltinLtepProtocols(pex bool) LocalLtepProtocolMap {
  1142. ps := []pp.ExtensionName{pp.ExtensionNameMetadata, utHolepunch.ExtensionName}
  1143. if pex {
  1144. ps = append(ps, pp.ExtensionNamePex)
  1145. }
  1146. return LocalLtepProtocolMap{
  1147. Index: ps,
  1148. NumBuiltin: len(ps),
  1149. }
  1150. }
  1151. func (c *PeerConn) addBuiltinLtepProtocols(pex bool) {
  1152. c.LocalLtepProtocolMap = &c.t.cl.defaultLocalLtepProtocolMap
  1153. }
  1154. func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byte) error {
  1155. pc.locker().Lock()
  1156. defer pc.locker().Unlock()
  1157. id := pc.PeerExtensionIDs[extName]
  1158. if id == 0 {
  1159. return fmt.Errorf("peer does not support or has disabled extension %q", extName)
  1160. }
  1161. pc.write(pp.Message{
  1162. Type: pp.Extended,
  1163. ExtendedID: id,
  1164. ExtendedPayload: payload,
  1165. })
  1166. return nil
  1167. }
  1168. func (pc *PeerConn) shouldRequestHashes() bool {
  1169. return pc.t.haveInfo() && pc.v2 && pc.t.info.HasV2()
  1170. }
  1171. func (pc *PeerConn) requestMissingHashes() {
  1172. if !pc.shouldRequestHashes() {
  1173. return
  1174. }
  1175. info := pc.t.info
  1176. baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo(
  1177. uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)),
  1178. ))
  1179. nextFileBeginPiece := 0
  1180. file:
  1181. for _, file := range info.UpvertedFiles() {
  1182. fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength)
  1183. // We would be requesting the leaves, the file must be short enough that we can just do with
  1184. // the pieces root as the piece hash.
  1185. if fileNumPieces <= 1 {
  1186. continue
  1187. }
  1188. curFileBeginPiece := nextFileBeginPiece
  1189. nextFileBeginPiece += fileNumPieces
  1190. haveAllHashes := true
  1191. for i := range fileNumPieces {
  1192. torrentPieceIndex := curFileBeginPiece + i
  1193. if !pc.peerHasPiece(torrentPieceIndex) {
  1194. continue file
  1195. }
  1196. if !pc.t.piece(torrentPieceIndex).hashV2.Ok {
  1197. haveAllHashes = false
  1198. }
  1199. }
  1200. if haveAllHashes {
  1201. continue
  1202. }
  1203. piecesRoot := file.PiecesRoot.Unwrap()
  1204. proofLayers := pp.Integer(0)
  1205. for index := 0; index < fileNumPieces; index += 512 {
  1206. // Minimizing to the number of pieces in a file conflicts with the BEP.
  1207. length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index)))
  1208. if length < 2 {
  1209. // This should have been filtered out by baseLayer and pieces root as piece hash
  1210. // checks.
  1211. panic(length)
  1212. }
  1213. if length%2 != 0 {
  1214. pc.protocolLogger.Levelf(log.Warning, "requesting odd hashes length %d", length)
  1215. }
  1216. msg := pp.Message{
  1217. Type: pp.HashRequest,
  1218. PiecesRoot: piecesRoot,
  1219. BaseLayer: baseLayer,
  1220. Index: pp.Integer(index),
  1221. Length: pp.Integer(length),
  1222. ProofLayers: proofLayers,
  1223. }
  1224. hr := hashRequestFromMessage(msg)
  1225. if generics.MapContains(pc.sentHashRequests, hr) {
  1226. continue
  1227. }
  1228. pc.write(msg)
  1229. generics.MakeMapIfNil(&pc.sentHashRequests)
  1230. pc.sentHashRequests[hr] = struct{}{}
  1231. }
  1232. }
  1233. }
  1234. func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) {
  1235. file := pc.t.getFileByPiecesRoot(msg.PiecesRoot)
  1236. filePieceHashes := pc.receivedHashPieces[msg.PiecesRoot]
  1237. if filePieceHashes == nil {
  1238. filePieceHashes = make([][32]byte, file.numPieces())
  1239. generics.MakeMapIfNil(&pc.receivedHashPieces)
  1240. pc.receivedHashPieces[msg.PiecesRoot] = filePieceHashes
  1241. }
  1242. if msg.ProofLayers != 0 {
  1243. // This isn't handled yet.
  1244. panic(msg.ProofLayers)
  1245. }
  1246. copy(filePieceHashes[msg.Index:], msg.Hashes)
  1247. root := merkle.RootWithPadHash(
  1248. filePieceHashes,
  1249. metainfo.HashForPiecePad(int64(pc.t.usualPieceSize())))
  1250. expectedPiecesRoot := file.piecesRoot.Unwrap()
  1251. if root == expectedPiecesRoot {
  1252. pc.protocolLogger.WithNames(v2HashesLogName).Levelf(
  1253. log.Info,
  1254. "got piece hashes for file %v (num pieces %v)",
  1255. file, file.numPieces())
  1256. for filePieceIndex, peerHash := range filePieceHashes {
  1257. torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex
  1258. pc.t.piece(torrentPieceIndex).setV2Hash(peerHash)
  1259. }
  1260. } else {
  1261. pc.protocolLogger.WithNames(v2HashesLogName).Levelf(
  1262. log.Debug,
  1263. "peer file piece hashes root mismatch: %x != %x",
  1264. root, expectedPiecesRoot)
  1265. }
  1266. return nil
  1267. }
  1268. func (pc *PeerConn) getHashes(msg *pp.Message) ([][32]byte, error) {
  1269. if msg.ProofLayers != 0 {
  1270. return nil, errors.New("proof layers not supported")
  1271. }
  1272. if msg.Length > 8192 {
  1273. return nil, fmt.Errorf("requested too many hashes: %d", msg.Length)
  1274. }
  1275. file := pc.t.getFileByPiecesRoot(msg.PiecesRoot)
  1276. if file == nil {
  1277. return nil, fmt.Errorf("no file for pieces root %x", msg.PiecesRoot)
  1278. }
  1279. beginPieceIndex := file.BeginPieceIndex()
  1280. endPieceIndex := file.EndPieceIndex()
  1281. length := merkle.RoundUpToPowerOfTwo(uint(endPieceIndex - beginPieceIndex))
  1282. if uint(msg.Index+msg.Length) > length {
  1283. return nil, errors.New("invalid hash range")
  1284. }
  1285. hashes := make([][32]byte, msg.Length)
  1286. padHash := metainfo.HashForPiecePad(int64(pc.t.usualPieceSize()))
  1287. for i := range hashes {
  1288. torrentPieceIndex := beginPieceIndex + int(msg.Index) + i
  1289. if torrentPieceIndex >= endPieceIndex {
  1290. hashes[i] = padHash
  1291. continue
  1292. }
  1293. piece := pc.t.piece(torrentPieceIndex)
  1294. hash, err := piece.obtainHashV2()
  1295. if err != nil {
  1296. return nil, fmt.Errorf("can't get hash for piece %d: %w", torrentPieceIndex, err)
  1297. }
  1298. hashes[i] = hash
  1299. }
  1300. return hashes, nil
  1301. }
  1302. func (pc *PeerConn) onHashRequest(msg *pp.Message) error {
  1303. if !pc.t.info.HasV2() {
  1304. return errors.New("torrent has no v2 metadata")
  1305. }
  1306. resp := pp.Message{
  1307. PiecesRoot: msg.PiecesRoot,
  1308. BaseLayer: msg.BaseLayer,
  1309. Index: msg.Index,
  1310. Length: msg.Length,
  1311. ProofLayers: msg.ProofLayers,
  1312. }
  1313. hashes, err := pc.getHashes(msg)
  1314. if err != nil {
  1315. pc.protocolLogger.WithNames(v2HashesLogName).Levelf(log.Debug, "error getting hashes: %v", err)
  1316. resp.Type = pp.HashReject
  1317. pc.write(resp)
  1318. return nil
  1319. }
  1320. resp.Type = pp.Hashes
  1321. resp.Hashes = hashes
  1322. pc.write(resp)
  1323. return nil
  1324. }
  1325. type hashRequest struct {
  1326. piecesRoot [32]byte
  1327. baseLayer, index, length, proofLayers pp.Integer
  1328. }
  1329. func (hr hashRequest) toMessage() pp.Message {
  1330. return pp.Message{
  1331. Type: pp.HashRequest,
  1332. PiecesRoot: hr.piecesRoot,
  1333. BaseLayer: hr.baseLayer,
  1334. Index: hr.index,
  1335. Length: hr.length,
  1336. ProofLayers: hr.proofLayers,
  1337. }
  1338. }
  1339. func hashRequestFromMessage(m pp.Message) hashRequest {
  1340. return hashRequest{
  1341. piecesRoot: m.PiecesRoot,
  1342. baseLayer: m.BaseLayer,
  1343. index: m.Index,
  1344. length: m.Length,
  1345. proofLayers: m.ProofLayers,
  1346. }
  1347. }