peer.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908
  1. package torrent
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/RoaringBitmap/roaring"
  11. "github.com/anacrolix/chansync"
  12. . "github.com/anacrolix/generics"
  13. "github.com/anacrolix/log"
  14. "github.com/anacrolix/missinggo/iter"
  15. "github.com/anacrolix/missinggo/v2/bitmap"
  16. "github.com/anacrolix/multiless"
  17. "github.com/anacrolix/torrent/internal/alloclim"
  18. "github.com/anacrolix/torrent/mse"
  19. pp "github.com/anacrolix/torrent/peer_protocol"
  20. request_strategy "github.com/anacrolix/torrent/request-strategy"
  21. typedRoaring "github.com/anacrolix/torrent/typed-roaring"
  22. )
  23. type (
  24. Peer struct {
  25. // First to ensure 64-bit alignment for atomics. See #262.
  26. _stats ConnStats
  27. t *Torrent
  28. peerImpl
  29. callbacks *Callbacks
  30. outgoing bool
  31. Network string
  32. RemoteAddr PeerRemoteAddr
  33. // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the
  34. // config.
  35. localPublicAddr peerLocalPublicAddr
  36. bannableAddr Option[bannableAddr]
  37. // True if the connection is operating over MSE obfuscation.
  38. headerEncrypted bool
  39. cryptoMethod mse.CryptoMethod
  40. Discovery PeerSource
  41. trusted bool
  42. closed chansync.SetOnce
  43. // Set true after we've added our ConnStats generated during handshake to
  44. // other ConnStat instances as determined when the *Torrent became known.
  45. reconciledHandshakeStats bool
  46. lastMessageReceived time.Time
  47. completedHandshake time.Time
  48. lastUsefulChunkReceived time.Time
  49. lastChunkSent time.Time
  50. // Stuff controlled by the local peer.
  51. needRequestUpdate updateRequestReason
  52. requestState request_strategy.PeerRequestState
  53. updateRequestsTimer *time.Timer
  54. lastRequestUpdate time.Time
  55. peakRequests maxRequests
  56. lastBecameInterested time.Time
  57. priorInterest time.Duration
  58. lastStartedExpectingToReceiveChunks time.Time
  59. cumulativeExpectedToReceiveChunks time.Duration
  60. _chunksReceivedWhileExpecting int64
  61. choking bool
  62. piecesReceivedSinceLastRequestUpdate maxRequests
  63. maxPiecesReceivedBetweenRequestUpdates maxRequests
  64. // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
  65. // and implementation differences, we may receive chunks that are no longer in the set of
  66. // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
  67. validReceiveChunks map[RequestIndex]int
  68. // Indexed by metadata piece, set to true if posted and pending a
  69. // response.
  70. metadataRequests []bool
  71. sentHaves bitmap.Bitmap
  72. // Stuff controlled by the remote peer.
  73. peerInterested bool
  74. peerChoking bool
  75. peerRequests map[Request]*peerRequestState
  76. PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
  77. // The highest possible number of pieces the torrent could have based on
  78. // communication with the peer. Generally only useful until we have the
  79. // torrent info.
  80. peerMinPieces pieceIndex
  81. // Pieces we've accepted chunks for from the peer.
  82. peerTouchedPieces map[pieceIndex]struct{}
  83. peerAllowedFast typedRoaring.Bitmap[pieceIndex]
  84. PeerMaxRequests maxRequests // Maximum pending requests the peer allows.
  85. logger log.Logger
  86. }
  87. PeerSource string
  88. peerRequestState struct {
  89. data []byte
  90. allocReservation *alloclim.Reservation
  91. }
  92. PeerRemoteAddr interface {
  93. String() string
  94. }
  95. peerRequests = orderedBitmap[RequestIndex]
  96. updateRequestReason string
  97. )
  98. const (
  99. PeerSourceUtHolepunch = "C"
  100. PeerSourceTracker = "Tr"
  101. PeerSourceIncoming = "I"
  102. PeerSourceDhtGetPeers = "Hg" // Peers we found by searching a DHT.
  103. PeerSourceDhtAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
  104. PeerSourcePex = "X"
  105. // The peer was given directly, such as through a magnet link.
  106. PeerSourceDirect = "M"
  107. )
  108. // These are grouped because we might vary update request behaviour depending on the reason. I'm not
  109. // sure about the fact that multiple reasons can be triggered before an update runs, and only the
  110. // first will count. Possibly we should be signalling what behaviours are appropriate in the next
  111. // update instead.
  112. const (
  113. peerUpdateRequestsPeerCancelReason updateRequestReason = "Peer.cancel"
  114. peerUpdateRequestsRemoteRejectReason updateRequestReason = "Peer.remoteRejectedRequest"
  115. )
  116. // Returns the Torrent a Peer belongs to. Shouldn't change for the lifetime of the Peer. May be nil
  117. // if we are the receiving end of a connection and the handshake hasn't been received or accepted
  118. // yet.
  119. func (p *Peer) Torrent() *Torrent {
  120. return p.t
  121. }
  122. func (p *Peer) initRequestState() {
  123. p.requestState.Requests = &peerRequests{}
  124. }
  125. func (cn *Peer) updateExpectingChunks() {
  126. if cn.expectingChunks() {
  127. if cn.lastStartedExpectingToReceiveChunks.IsZero() {
  128. cn.lastStartedExpectingToReceiveChunks = time.Now()
  129. }
  130. } else {
  131. if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
  132. cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
  133. cn.lastStartedExpectingToReceiveChunks = time.Time{}
  134. }
  135. }
  136. }
  137. func (cn *Peer) expectingChunks() bool {
  138. if cn.requestState.Requests.IsEmpty() {
  139. return false
  140. }
  141. if !cn.requestState.Interested {
  142. return false
  143. }
  144. if !cn.peerChoking {
  145. return true
  146. }
  147. haveAllowedFastRequests := false
  148. cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
  149. haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
  150. cn.requestState.Requests,
  151. cn.t.pieceRequestIndexOffset(i),
  152. cn.t.pieceRequestIndexOffset(i+1),
  153. ) == 0
  154. return !haveAllowedFastRequests
  155. })
  156. return haveAllowedFastRequests
  157. }
  158. func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
  159. return cn.peerChoking && !cn.peerAllowedFast.Contains(piece)
  160. }
  161. func (cn *Peer) cumInterest() time.Duration {
  162. ret := cn.priorInterest
  163. if cn.requestState.Interested {
  164. ret += time.Since(cn.lastBecameInterested)
  165. }
  166. return ret
  167. }
  168. func (cn *Peer) locker() *lockWithDeferreds {
  169. return cn.t.cl.locker()
  170. }
  171. func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
  172. _, ok := cn.PeerExtensionIDs[ext]
  173. return ok
  174. }
  175. // The best guess at number of pieces in the torrent for this peer.
  176. func (cn *Peer) bestPeerNumPieces() pieceIndex {
  177. if cn.t.haveInfo() {
  178. return cn.t.numPieces()
  179. }
  180. return cn.peerMinPieces
  181. }
  182. func (cn *Peer) completedString() string {
  183. have := pieceIndex(cn.peerPieces().GetCardinality())
  184. if all, _ := cn.peerHasAllPieces(); all {
  185. have = cn.bestPeerNumPieces()
  186. }
  187. return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
  188. }
  189. func eventAgeString(t time.Time) string {
  190. if t.IsZero() {
  191. return "never"
  192. }
  193. return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
  194. }
  195. // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
  196. func (cn *Peer) statusFlags() (ret string) {
  197. c := func(b byte) {
  198. ret += string([]byte{b})
  199. }
  200. if cn.requestState.Interested {
  201. c('i')
  202. }
  203. if cn.choking {
  204. c('c')
  205. }
  206. c(':')
  207. ret += cn.connectionFlags()
  208. c(':')
  209. if cn.peerInterested {
  210. c('i')
  211. }
  212. if cn.peerChoking {
  213. c('c')
  214. }
  215. return
  216. }
  217. func (cn *Peer) downloadRate() float64 {
  218. num := cn._stats.BytesReadUsefulData.Int64()
  219. if num == 0 {
  220. return 0
  221. }
  222. return float64(num) / cn.totalExpectingTime().Seconds()
  223. }
  224. func (p *Peer) DownloadRate() float64 {
  225. p.locker().RLock()
  226. defer p.locker().RUnlock()
  227. return p.downloadRate()
  228. }
  229. func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
  230. var last Option[pieceIndex]
  231. var count int
  232. next := func(item Option[pieceIndex]) {
  233. if item == last {
  234. count++
  235. } else {
  236. if count != 0 {
  237. f(last.Value, count)
  238. }
  239. last = item
  240. count = 1
  241. }
  242. }
  243. cn.requestState.Requests.Iterate(func(requestIndex request_strategy.RequestIndex) bool {
  244. next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
  245. return true
  246. })
  247. next(None[pieceIndex]())
  248. }
  249. func (cn *Peer) writeStatus(w io.Writer) {
  250. // \t isn't preserved in <pre> blocks?
  251. if cn.closed.IsSet() {
  252. fmt.Fprint(w, "CLOSED: ")
  253. }
  254. fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
  255. prio, err := cn.peerPriority()
  256. prioStr := fmt.Sprintf("%08x", prio)
  257. if err != nil {
  258. prioStr += ": " + err.Error()
  259. }
  260. fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
  261. fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
  262. eventAgeString(cn.lastMessageReceived),
  263. eventAgeString(cn.completedHandshake),
  264. eventAgeString(cn.lastHelpful()),
  265. cn.cumInterest(),
  266. cn.totalExpectingTime(),
  267. )
  268. fmt.Fprintf(w,
  269. "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
  270. cn.completedString(),
  271. len(cn.peerTouchedPieces),
  272. &cn._stats.ChunksReadUseful,
  273. &cn._stats.ChunksRead,
  274. &cn._stats.ChunksWritten,
  275. cn.requestState.Requests.GetCardinality(),
  276. cn.requestState.Cancelled.GetCardinality(),
  277. cn.nominalMaxRequests(),
  278. cn.PeerMaxRequests,
  279. len(cn.peerRequests),
  280. localClientReqq,
  281. cn.statusFlags(),
  282. cn.downloadRate()/(1<<10),
  283. )
  284. fmt.Fprintf(w, "requested pieces:")
  285. cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
  286. fmt.Fprintf(w, " %v(%v)", piece, count)
  287. })
  288. fmt.Fprintf(w, "\n")
  289. }
  290. func (p *Peer) close() {
  291. if !p.closed.Set() {
  292. return
  293. }
  294. if p.updateRequestsTimer != nil {
  295. p.updateRequestsTimer.Stop()
  296. }
  297. for _, prs := range p.peerRequests {
  298. prs.allocReservation.Drop()
  299. }
  300. p.peerImpl.onClose()
  301. if p.t != nil {
  302. p.t.decPeerPieceAvailability(p)
  303. }
  304. for _, f := range p.callbacks.PeerClosed {
  305. f(p)
  306. }
  307. }
  308. func (p *Peer) Close() error {
  309. p.locker().Lock()
  310. defer p.locker().Unlock()
  311. p.close()
  312. return nil
  313. }
  314. // Peer definitely has a piece, for purposes of requesting. So it's not sufficient that we think
  315. // they do (known=true).
  316. func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
  317. if all, known := cn.peerHasAllPieces(); all && known {
  318. return true
  319. }
  320. return cn.peerPieces().ContainsInt(piece)
  321. }
  322. // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
  323. // https://github.com/pion/datachannel/issues/59 is fixed.
  324. const (
  325. writeBufferHighWaterLen = 1 << 15
  326. writeBufferLowWaterLen = writeBufferHighWaterLen / 2
  327. )
  328. var (
  329. interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary())
  330. requestMsgLen = len(pp.Message{Type: pp.Request}.MustMarshalBinary())
  331. // This is the maximum request count that could fit in the write buffer if it's at or below the
  332. // low water mark when we run maybeUpdateActualRequestState.
  333. maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen
  334. )
  335. // The actual value to use as the maximum outbound requests.
  336. func (cn *Peer) nominalMaxRequests() maxRequests {
  337. return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
  338. }
  339. func (cn *Peer) totalExpectingTime() (ret time.Duration) {
  340. ret = cn.cumulativeExpectedToReceiveChunks
  341. if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
  342. ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
  343. }
  344. return
  345. }
  346. func (cn *Peer) setInterested(interested bool) bool {
  347. if cn.requestState.Interested == interested {
  348. return true
  349. }
  350. cn.requestState.Interested = interested
  351. if interested {
  352. cn.lastBecameInterested = time.Now()
  353. } else if !cn.lastBecameInterested.IsZero() {
  354. cn.priorInterest += time.Since(cn.lastBecameInterested)
  355. }
  356. cn.updateExpectingChunks()
  357. // log.Printf("%p: setting interest: %v", cn, interested)
  358. return cn.writeInterested(interested)
  359. }
  360. // The function takes a message to be sent, and returns true if more messages
  361. // are okay.
  362. type messageWriter func(pp.Message) bool
  363. // This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
  364. // when we want to go fast.
  365. func (cn *Peer) shouldRequest(r RequestIndex) error {
  366. err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
  367. if err != nil {
  368. return err
  369. }
  370. pi := cn.t.pieceIndexOfRequestIndex(r)
  371. if cn.requestState.Cancelled.Contains(r) {
  372. return errors.New("request is cancelled and waiting acknowledgement")
  373. }
  374. if !cn.peerHasPiece(pi) {
  375. return errors.New("requesting piece peer doesn't have")
  376. }
  377. if !cn.t.peerIsActive(cn) {
  378. panic("requesting but not in active conns")
  379. }
  380. if cn.closed.IsSet() {
  381. panic("requesting when connection is closed")
  382. }
  383. if cn.t.hashingPiece(pi) {
  384. panic("piece is being hashed")
  385. }
  386. if cn.t.pieceQueuedForHash(pi) {
  387. panic("piece is queued for hash")
  388. }
  389. if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
  390. // This could occur if we made a request with the fast extension, and then got choked and
  391. // haven't had the request rejected yet.
  392. if !cn.requestState.Requests.Contains(r) {
  393. panic("peer choking and piece not allowed fast")
  394. }
  395. }
  396. return nil
  397. }
  398. func (cn *Peer) mustRequest(r RequestIndex) bool {
  399. more, err := cn.request(r)
  400. if err != nil {
  401. panic(err)
  402. }
  403. return more
  404. }
  405. func (cn *Peer) request(r RequestIndex) (more bool, err error) {
  406. if err := cn.shouldRequest(r); err != nil {
  407. panic(err)
  408. }
  409. if cn.requestState.Requests.Contains(r) {
  410. return true, nil
  411. }
  412. if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
  413. return true, errors.New("too many outstanding requests")
  414. }
  415. cn.requestState.Requests.Add(r)
  416. if cn.validReceiveChunks == nil {
  417. cn.validReceiveChunks = make(map[RequestIndex]int)
  418. }
  419. cn.validReceiveChunks[r]++
  420. cn.t.requestState[r] = requestState{
  421. peer: cn,
  422. when: time.Now(),
  423. }
  424. cn.updateExpectingChunks()
  425. ppReq := cn.t.requestIndexToRequest(r)
  426. for _, f := range cn.callbacks.SentRequest {
  427. f(PeerRequestEvent{cn, ppReq})
  428. }
  429. return cn.peerImpl._request(ppReq), nil
  430. }
  431. func (me *Peer) cancel(r RequestIndex) {
  432. if !me.deleteRequest(r) {
  433. panic("request not existing should have been guarded")
  434. }
  435. if me._cancel(r) {
  436. // Record that we expect to get a cancel ack.
  437. if !me.requestState.Cancelled.CheckedAdd(r) {
  438. panic("request already cancelled")
  439. }
  440. }
  441. me.decPeakRequests()
  442. if me.isLowOnRequests() {
  443. me.updateRequests(peerUpdateRequestsPeerCancelReason)
  444. }
  445. }
  446. // Sets a reason to update requests, and if there wasn't already one, handle it.
  447. func (cn *Peer) updateRequests(reason updateRequestReason) {
  448. if cn.needRequestUpdate != "" {
  449. return
  450. }
  451. cn.needRequestUpdate = reason
  452. cn.handleUpdateRequests()
  453. }
  454. // Emits the indices in the Bitmaps bms in order, never repeating any index.
  455. // skip is mutated during execution, and its initial values will never be
  456. // emitted.
  457. func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
  458. return func(cb iter.Callback) {
  459. for _, bm := range bms {
  460. if !iter.All(
  461. func(_i interface{}) bool {
  462. i := _i.(int)
  463. if skip.Contains(bitmap.BitIndex(i)) {
  464. return true
  465. }
  466. skip.Add(bitmap.BitIndex(i))
  467. return cb(i)
  468. },
  469. bm.Iter,
  470. ) {
  471. return
  472. }
  473. }
  474. }
  475. }
  476. // After handshake, we know what Torrent and Client stats to include for a
  477. // connection.
  478. func (cn *Peer) postHandshakeStats(f func(*ConnStats)) {
  479. t := cn.t
  480. f(&t.stats)
  481. f(&t.cl.connStats)
  482. }
  483. // All ConnStats that include this connection. Some objects are not known
  484. // until the handshake is complete, after which it's expected to reconcile the
  485. // differences.
  486. func (cn *Peer) allStats(f func(*ConnStats)) {
  487. f(&cn._stats)
  488. if cn.reconciledHandshakeStats {
  489. cn.postHandshakeStats(f)
  490. }
  491. }
  492. func (cn *Peer) readBytes(n int64) {
  493. cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
  494. }
  495. func (c *Peer) lastHelpful() (ret time.Time) {
  496. ret = c.lastUsefulChunkReceived
  497. if c.t.seeding() && c.lastChunkSent.After(ret) {
  498. ret = c.lastChunkSent
  499. }
  500. return
  501. }
  502. // Returns whether any part of the chunk would lie outside a piece of the given length.
  503. func chunkOverflowsPiece(cs ChunkSpec, pieceLength pp.Integer) bool {
  504. switch {
  505. default:
  506. return false
  507. case cs.Begin+cs.Length > pieceLength:
  508. // Check for integer overflow
  509. case cs.Begin > pp.IntegerMax-cs.Length:
  510. }
  511. return true
  512. }
  513. func runSafeExtraneous(f func()) {
  514. if true {
  515. go f()
  516. } else {
  517. f()
  518. }
  519. }
  520. // Returns true if it was valid to reject the request.
  521. func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
  522. if c.deleteRequest(r) {
  523. c.decPeakRequests()
  524. } else if !c.requestState.Cancelled.CheckedRemove(r) {
  525. return false
  526. }
  527. if c.isLowOnRequests() {
  528. c.updateRequests(peerUpdateRequestsRemoteRejectReason)
  529. }
  530. c.decExpectedChunkReceive(r)
  531. return true
  532. }
  533. func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
  534. count := c.validReceiveChunks[r]
  535. if count == 1 {
  536. delete(c.validReceiveChunks, r)
  537. } else if count > 1 {
  538. c.validReceiveChunks[r] = count - 1
  539. } else {
  540. panic(r)
  541. }
  542. }
  543. func (c *Peer) doChunkReadStats(size int64) {
  544. c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
  545. }
  546. // Handle a received chunk from a peer.
  547. func (c *Peer) receiveChunk(msg *pp.Message) error {
  548. ChunksReceived.Add("total", 1)
  549. ppReq := newRequestFromMessage(msg)
  550. t := c.t
  551. err := t.checkValidReceiveChunk(ppReq)
  552. if err != nil {
  553. err = log.WithLevel(log.Warning, err)
  554. return err
  555. }
  556. req := c.t.requestIndexFromRequest(ppReq)
  557. recordBlockForSmartBan := sync.OnceFunc(func() {
  558. c.recordBlockForSmartBan(req, msg.Piece)
  559. })
  560. // This needs to occur before we return, but we try to do it when the client is unlocked. It
  561. // can't be done before checking if chunks are valid because they won't be deallocated by piece
  562. // hashing if they're out of bounds.
  563. defer recordBlockForSmartBan()
  564. if c.peerChoking {
  565. ChunksReceived.Add("while choked", 1)
  566. }
  567. if c.validReceiveChunks[req] <= 0 {
  568. ChunksReceived.Add("unexpected", 1)
  569. return errors.New("received unexpected chunk")
  570. }
  571. c.decExpectedChunkReceive(req)
  572. if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
  573. ChunksReceived.Add("due to allowed fast", 1)
  574. }
  575. // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
  576. // have actually already received the piece, while we have the Client unlocked to write the data
  577. // out.
  578. intended := false
  579. {
  580. if c.requestState.Requests.Contains(req) {
  581. for _, f := range c.callbacks.ReceivedRequested {
  582. f(PeerMessageEvent{c, msg})
  583. }
  584. }
  585. // Request has been satisfied.
  586. if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
  587. intended = true
  588. if !c.peerChoking {
  589. c._chunksReceivedWhileExpecting++
  590. }
  591. if c.isLowOnRequests() {
  592. c.updateRequests("Peer.receiveChunk deleted request")
  593. }
  594. } else {
  595. ChunksReceived.Add("unintended", 1)
  596. }
  597. }
  598. cl := t.cl
  599. // Do we actually want this chunk?
  600. if t.haveChunk(ppReq) {
  601. // panic(fmt.Sprintf("%+v", ppReq))
  602. ChunksReceived.Add("redundant", 1)
  603. c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
  604. return nil
  605. }
  606. piece := &t.pieces[ppReq.Index]
  607. c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
  608. c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
  609. if intended {
  610. c.piecesReceivedSinceLastRequestUpdate++
  611. c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
  612. }
  613. for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
  614. f(ReceivedUsefulDataEvent{c, msg})
  615. }
  616. c.lastUsefulChunkReceived = time.Now()
  617. // Need to record that it hasn't been written yet, before we attempt to do
  618. // anything with it.
  619. piece.incrementPendingWrites()
  620. // Record that we have the chunk, so we aren't trying to download it while
  621. // waiting for it to be written to storage.
  622. piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
  623. // Cancel pending requests for this chunk from *other* peers.
  624. if p := t.requestingPeer(req); p != nil {
  625. if p == c {
  626. panic("should not be pending request from conn that just received it")
  627. }
  628. p.cancel(req)
  629. }
  630. err = func() error {
  631. cl.unlock()
  632. defer cl.lock()
  633. // Opportunistically do this here while we aren't holding the client lock.
  634. recordBlockForSmartBan()
  635. concurrentChunkWrites.Add(1)
  636. defer concurrentChunkWrites.Add(-1)
  637. // Write the chunk out. Note that the upper bound on chunk writing concurrency will be the
  638. // number of connections. We write inline with receiving the chunk (with this lock dance),
  639. // because we want to handle errors synchronously and I haven't thought of a nice way to
  640. // defer any concurrency to the storage and have that notify the client of errors. TODO: Do
  641. // that instead.
  642. return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
  643. }()
  644. piece.decrementPendingWrites()
  645. if err != nil {
  646. c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err)
  647. t.pendRequest(req)
  648. // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
  649. // request update runs while we're writing the chunk that just failed. Then we never do a
  650. // fresh update after pending the failed request.
  651. c.updateRequests("Peer.receiveChunk error writing chunk")
  652. t.onWriteChunkErr(err)
  653. return nil
  654. }
  655. c.onDirtiedPiece(pieceIndex(ppReq.Index))
  656. // We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
  657. if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 {
  658. t.queuePieceCheck(pieceIndex(ppReq.Index))
  659. // We don't pend all chunks here anymore because we don't want code dependent on the dirty
  660. // chunk status (such as the haveChunk call above) to have to check all the various other
  661. // piece states like queued for hash, hashing etc. This does mean that we need to be sure
  662. // that chunk pieces are pended at an appropriate time later however.
  663. }
  664. cl.event.Broadcast()
  665. // We do this because we've written a chunk, and may change PieceState.Partial.
  666. t.publishPieceStateChange(pieceIndex(ppReq.Index))
  667. return nil
  668. }
  669. func (c *Peer) onDirtiedPiece(piece pieceIndex) {
  670. if c.peerTouchedPieces == nil {
  671. c.peerTouchedPieces = make(map[pieceIndex]struct{})
  672. }
  673. c.peerTouchedPieces[piece] = struct{}{}
  674. ds := &c.t.pieces[piece].dirtiers
  675. if *ds == nil {
  676. *ds = make(map[*Peer]struct{})
  677. }
  678. (*ds)[c] = struct{}{}
  679. }
  680. func (cn *Peer) netGoodPiecesDirtied() int64 {
  681. return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
  682. }
  683. func (c *Peer) peerHasWantedPieces() bool {
  684. if all, _ := c.peerHasAllPieces(); all {
  685. return !c.t.haveAllPieces() && !c.t._pendingPieces.IsEmpty()
  686. }
  687. if !c.t.haveInfo() {
  688. return !c.peerPieces().IsEmpty()
  689. }
  690. return c.peerPieces().Intersects(&c.t._pendingPieces)
  691. }
  692. // Returns true if an outstanding request is removed. Cancelled requests should be handled
  693. // separately.
  694. func (c *Peer) deleteRequest(r RequestIndex) bool {
  695. if !c.requestState.Requests.CheckedRemove(r) {
  696. return false
  697. }
  698. for _, f := range c.callbacks.DeletedRequest {
  699. f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
  700. }
  701. c.updateExpectingChunks()
  702. if c.t.requestingPeer(r) != c {
  703. panic("only one peer should have a given request at a time")
  704. }
  705. delete(c.t.requestState, r)
  706. // c.t.iterPeers(func(p *Peer) {
  707. // if p.isLowOnRequests() {
  708. // p.updateRequests("Peer.deleteRequest")
  709. // }
  710. // })
  711. return true
  712. }
  713. func (c *Peer) deleteAllRequests(reason updateRequestReason) {
  714. if c.requestState.Requests.IsEmpty() {
  715. return
  716. }
  717. c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
  718. if !c.deleteRequest(x) {
  719. panic("request should exist")
  720. }
  721. return true
  722. })
  723. c.assertNoRequests()
  724. c.t.iterPeers(func(p *Peer) {
  725. if p.isLowOnRequests() {
  726. p.updateRequests(reason)
  727. }
  728. })
  729. return
  730. }
  731. func (c *Peer) assertNoRequests() {
  732. if !c.requestState.Requests.IsEmpty() {
  733. panic(c.requestState.Requests.GetCardinality())
  734. }
  735. }
  736. func (c *Peer) cancelAllRequests() {
  737. c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
  738. c.cancel(x)
  739. return true
  740. })
  741. c.assertNoRequests()
  742. return
  743. }
  744. func (c *Peer) peerPriority() (peerPriority, error) {
  745. return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
  746. }
  747. func (c *Peer) remoteIp() net.IP {
  748. host, _, _ := net.SplitHostPort(c.RemoteAddr.String())
  749. return net.ParseIP(host)
  750. }
  751. func (c *Peer) remoteIpPort() IpPort {
  752. ipa, _ := tryIpPortFromNetAddr(c.RemoteAddr)
  753. return IpPort{ipa.IP, uint16(ipa.Port)}
  754. }
  755. func (c *Peer) trust() connectionTrust {
  756. return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
  757. }
  758. type connectionTrust struct {
  759. Implicit bool
  760. NetGoodPiecesDirted int64
  761. }
  762. func (l connectionTrust) Less(r connectionTrust) bool {
  763. return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
  764. }
  765. // Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims.
  766. func (cn *Peer) newPeerPieces() *roaring.Bitmap {
  767. // TODO: Can we use copy on write?
  768. ret := cn.peerPieces().Clone()
  769. if all, _ := cn.peerHasAllPieces(); all {
  770. if cn.t.haveInfo() {
  771. ret.AddRange(0, bitmap.BitRange(cn.t.numPieces()))
  772. } else {
  773. ret.AddRange(0, bitmap.ToEnd)
  774. }
  775. }
  776. return ret
  777. }
  778. func (cn *Peer) stats() *ConnStats {
  779. return &cn._stats
  780. }
  781. func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
  782. pc, ok := p.peerImpl.(*PeerConn)
  783. return pc, ok
  784. }
  785. func (p *Peer) uncancelledRequests() uint64 {
  786. return p.requestState.Requests.GetCardinality()
  787. }
  788. type peerLocalPublicAddr = IpPort
  789. func (p *Peer) isLowOnRequests() bool {
  790. return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
  791. }
  792. func (p *Peer) decPeakRequests() {
  793. // // This can occur when peak requests are altered by the update request timer to be lower than
  794. // // the actual number of outstanding requests. Let's let it go negative and see what happens. I
  795. // // wonder what happens if maxRequests is not signed.
  796. // if p.peakRequests < 1 {
  797. // panic(p.peakRequests)
  798. // }
  799. p.peakRequests--
  800. }
  801. func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) {
  802. if p.bannableAddr.Ok {
  803. p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData)
  804. }
  805. }