client.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872
  1. package torrent
  2. import (
  3. "bufio"
  4. "context"
  5. "crypto/rand"
  6. "encoding/binary"
  7. "encoding/hex"
  8. "errors"
  9. "expvar"
  10. "fmt"
  11. "io"
  12. "math"
  13. "net"
  14. "net/http"
  15. "net/netip"
  16. "sort"
  17. "strconv"
  18. "time"
  19. "github.com/anacrolix/chansync"
  20. "github.com/anacrolix/chansync/events"
  21. "github.com/anacrolix/dht/v2"
  22. "github.com/anacrolix/dht/v2/krpc"
  23. . "github.com/anacrolix/generics"
  24. g "github.com/anacrolix/generics"
  25. "github.com/anacrolix/log"
  26. "github.com/anacrolix/missinggo/perf"
  27. "github.com/anacrolix/missinggo/v2"
  28. "github.com/anacrolix/missinggo/v2/bitmap"
  29. "github.com/anacrolix/missinggo/v2/pproffd"
  30. "github.com/anacrolix/sync"
  31. "github.com/cespare/xxhash"
  32. "github.com/davecgh/go-spew/spew"
  33. "github.com/dustin/go-humanize"
  34. gbtree "github.com/google/btree"
  35. "github.com/pion/datachannel"
  36. "github.com/pion/webrtc/v3"
  37. "github.com/anacrolix/torrent/bencode"
  38. "github.com/anacrolix/torrent/internal/check"
  39. "github.com/anacrolix/torrent/internal/limiter"
  40. "github.com/anacrolix/torrent/iplist"
  41. "github.com/anacrolix/torrent/metainfo"
  42. "github.com/anacrolix/torrent/mse"
  43. pp "github.com/anacrolix/torrent/peer_protocol"
  44. request_strategy "github.com/anacrolix/torrent/request-strategy"
  45. "github.com/anacrolix/torrent/storage"
  46. "github.com/anacrolix/torrent/tracker"
  47. "github.com/anacrolix/torrent/types/infohash"
  48. infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
  49. "github.com/anacrolix/torrent/webtorrent"
  50. )
  51. // Clients contain zero or more Torrents. A Client manages a blocklist, the
  52. // TCP/UDP protocol ports, and DHT as desired.
  53. type Client struct {
  54. // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
  55. // fields. See #262.
  56. connStats ConnStats
  57. _mu lockWithDeferreds
  58. event sync.Cond
  59. closed chansync.SetOnce
  60. config *ClientConfig
  61. logger log.Logger
  62. peerID PeerID
  63. defaultStorage *storage.Client
  64. onClose []func()
  65. dialers []Dialer
  66. listeners []Listener
  67. dhtServers []DhtServer
  68. ipBlockList iplist.Ranger
  69. // Set of addresses that have our client ID. This intentionally will
  70. // include ourselves if we end up trying to connect to our own address
  71. // through legitimate channels.
  72. dopplegangerAddrs map[string]struct{}
  73. badPeerIPs map[netip.Addr]struct{}
  74. // All Torrents once.
  75. torrents map[*Torrent]struct{}
  76. // All Torrents by their short infohashes (v1 if valid, and truncated v2 if valid). Unless the
  77. // info has been obtained, there's no knowing if an infohash belongs to v1 or v2.
  78. torrentsByShortHash map[InfoHash]*Torrent
  79. pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
  80. acceptLimiter map[ipStr]int
  81. numHalfOpen int
  82. websocketTrackers websocketTrackers
  83. activeAnnounceLimiter limiter.Instance
  84. httpClient *http.Client
  85. clientHolepunchAddrSets
  86. defaultLocalLtepProtocolMap LocalLtepProtocolMap
  87. upnpMappings []*upnpMapping
  88. }
  89. type ipStr string
  90. func (cl *Client) BadPeerIPs() (ips []string) {
  91. cl.rLock()
  92. ips = cl.badPeerIPsLocked()
  93. cl.rUnlock()
  94. return
  95. }
  96. func (cl *Client) badPeerIPsLocked() (ips []string) {
  97. ips = make([]string, len(cl.badPeerIPs))
  98. i := 0
  99. for k := range cl.badPeerIPs {
  100. ips[i] = k.String()
  101. i += 1
  102. }
  103. return
  104. }
  105. func (cl *Client) PeerID() PeerID {
  106. return cl.peerID
  107. }
  108. // Returns the port number for the first listener that has one. No longer assumes that all port
  109. // numbers are the same, due to support for custom listeners. Returns zero if no port number is
  110. // found.
  111. func (cl *Client) LocalPort() (port int) {
  112. for i := 0; i < len(cl.listeners); i += 1 {
  113. if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
  114. return
  115. }
  116. }
  117. return
  118. }
  119. func writeDhtServerStatus(w io.Writer, s DhtServer) {
  120. dhtStats := s.Stats()
  121. fmt.Fprintf(w, " ID: %x\n", s.ID())
  122. spew.Fdump(w, dhtStats)
  123. }
  124. // Writes out a human readable status of the client, such as for writing to a
  125. // HTTP status page.
  126. func (cl *Client) WriteStatus(_w io.Writer) {
  127. cl.rLock()
  128. defer cl.rUnlock()
  129. w := bufio.NewWriter(_w)
  130. defer w.Flush()
  131. fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
  132. fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
  133. fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
  134. fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
  135. fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
  136. cl.eachDhtServer(func(s DhtServer) {
  137. fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
  138. writeDhtServerStatus(w, s)
  139. })
  140. dumpStats(w, cl.statsLocked())
  141. torrentsSlice := cl.torrentsAsSlice()
  142. fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
  143. fmt.Fprintln(w)
  144. sort.Slice(torrentsSlice, func(l, r int) bool {
  145. return torrentsSlice[l].canonicalShortInfohash().AsString() < torrentsSlice[r].canonicalShortInfohash().AsString()
  146. })
  147. for _, t := range torrentsSlice {
  148. if t.name() == "" {
  149. fmt.Fprint(w, "<unknown name>")
  150. } else {
  151. fmt.Fprint(w, t.name())
  152. }
  153. fmt.Fprint(w, "\n")
  154. if t.info != nil {
  155. fmt.Fprintf(
  156. w,
  157. "%f%% of %d bytes (%s)",
  158. 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
  159. t.length(),
  160. humanize.Bytes(uint64(t.length())))
  161. } else {
  162. w.WriteString("<missing metainfo>")
  163. }
  164. fmt.Fprint(w, "\n")
  165. t.writeStatus(w)
  166. fmt.Fprintln(w)
  167. }
  168. }
  169. func (cl *Client) initLogger() {
  170. logger := cl.config.Logger
  171. if logger.IsZero() {
  172. logger = log.Default
  173. }
  174. if cl.config.Debug {
  175. logger = logger.WithFilterLevel(log.Debug)
  176. }
  177. cl.logger = logger.WithValues(cl)
  178. }
  179. func (cl *Client) announceKey() int32 {
  180. return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
  181. }
  182. // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
  183. func (cl *Client) init(cfg *ClientConfig) {
  184. cl.config = cfg
  185. g.MakeMap(&cl.dopplegangerAddrs)
  186. g.MakeMap(&cl.torrentsByShortHash)
  187. g.MakeMap(&cl.torrents)
  188. cl.torrentsByShortHash = make(map[metainfo.Hash]*Torrent)
  189. cl.activeAnnounceLimiter.SlotsPerKey = 2
  190. cl.event.L = cl.locker()
  191. cl.ipBlockList = cfg.IPBlocklist
  192. cl.httpClient = &http.Client{
  193. Transport: cfg.WebTransport,
  194. }
  195. if cl.httpClient.Transport == nil {
  196. cl.httpClient.Transport = &http.Transport{
  197. Proxy: cfg.HTTPProxy,
  198. DialContext: cfg.HTTPDialContext,
  199. // I think this value was observed from some webseeds. It seems reasonable to extend it
  200. // to other uses of HTTP from the client.
  201. MaxConnsPerHost: 10,
  202. }
  203. }
  204. cl.defaultLocalLtepProtocolMap = makeBuiltinLtepProtocols(!cfg.DisablePEX)
  205. }
  206. func NewClient(cfg *ClientConfig) (cl *Client, err error) {
  207. if cfg == nil {
  208. cfg = NewDefaultClientConfig()
  209. cfg.ListenPort = 0
  210. }
  211. cl = &Client{}
  212. cl.init(cfg)
  213. go cl.acceptLimitClearer()
  214. cl.initLogger()
  215. //cl.logger.Levelf(log.Critical, "test after init")
  216. defer func() {
  217. if err != nil {
  218. cl.Close()
  219. cl = nil
  220. }
  221. }()
  222. storageImpl := cfg.DefaultStorage
  223. if storageImpl == nil {
  224. // We'd use mmap by default but HFS+ doesn't support sparse files.
  225. storageImplCloser := storage.NewFile(cfg.DataDir)
  226. cl.onClose = append(cl.onClose, func() {
  227. if err := storageImplCloser.Close(); err != nil {
  228. cl.logger.Printf("error closing default storage: %s", err)
  229. }
  230. })
  231. storageImpl = storageImplCloser
  232. }
  233. cl.defaultStorage = storage.NewClient(storageImpl)
  234. if cfg.PeerID != "" {
  235. missinggo.CopyExact(&cl.peerID, cfg.PeerID)
  236. } else {
  237. o := copy(cl.peerID[:], cfg.Bep20)
  238. _, err = rand.Read(cl.peerID[o:])
  239. if err != nil {
  240. panic("error generating peer id")
  241. }
  242. }
  243. builtinListenNetworks := cl.listenNetworks()
  244. sockets, err := listenAll(
  245. builtinListenNetworks,
  246. cl.config.ListenHost,
  247. cl.config.ListenPort,
  248. cl.firewallCallback,
  249. cl.logger,
  250. )
  251. if err != nil {
  252. return
  253. }
  254. if len(sockets) == 0 && len(builtinListenNetworks) != 0 {
  255. err = fmt.Errorf("no sockets created for networks %v", builtinListenNetworks)
  256. return
  257. }
  258. // Check for panics.
  259. cl.LocalPort()
  260. for _, _s := range sockets {
  261. s := _s // Go is fucking retarded.
  262. cl.onClose = append(cl.onClose, func() { go s.Close() })
  263. if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
  264. cl.dialers = append(cl.dialers, s)
  265. cl.listeners = append(cl.listeners, s)
  266. if cl.config.AcceptPeerConnections {
  267. go cl.acceptConnections(s)
  268. }
  269. }
  270. }
  271. go cl.forwardPort()
  272. if !cfg.NoDHT {
  273. for _, s := range sockets {
  274. if pc, ok := s.(net.PacketConn); ok {
  275. ds, err := cl.NewAnacrolixDhtServer(pc)
  276. if err != nil {
  277. panic(err)
  278. }
  279. cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
  280. cl.onClose = append(cl.onClose, func() { ds.Close() })
  281. }
  282. }
  283. }
  284. cl.websocketTrackers = websocketTrackers{
  285. PeerId: cl.peerID,
  286. Logger: cl.logger.WithNames("websocketTrackers"),
  287. GetAnnounceRequest: func(
  288. event tracker.AnnounceEvent, infoHash [20]byte,
  289. ) (
  290. tracker.AnnounceRequest, error,
  291. ) {
  292. cl.lock()
  293. defer cl.unlock()
  294. t, ok := cl.torrentsByShortHash[infoHash]
  295. if !ok {
  296. return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
  297. }
  298. return t.announceRequest(event, infoHash), nil
  299. },
  300. Proxy: cl.config.HTTPProxy,
  301. WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
  302. ICEServers: cl.ICEServers(),
  303. DialContext: cl.config.TrackerDialContext,
  304. OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
  305. cl.lock()
  306. defer cl.unlock()
  307. t, ok := cl.torrentsByShortHash[dcc.InfoHash]
  308. if !ok {
  309. cl.logger.WithDefaultLevel(log.Warning).Printf(
  310. "got webrtc conn for unloaded torrent with infohash %x",
  311. dcc.InfoHash,
  312. )
  313. dc.Close()
  314. return
  315. }
  316. go t.onWebRtcConn(dc, dcc)
  317. },
  318. }
  319. return
  320. }
  321. func (cl *Client) AddDhtServer(d DhtServer) {
  322. cl.dhtServers = append(cl.dhtServers, d)
  323. }
  324. // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
  325. // given address for any Torrent.
  326. func (cl *Client) AddDialer(d Dialer) {
  327. cl.lock()
  328. defer cl.unlock()
  329. cl.dialers = append(cl.dialers, d)
  330. for t := range cl.torrents {
  331. t.openNewConns()
  332. }
  333. }
  334. func (cl *Client) Listeners() []Listener {
  335. return cl.listeners
  336. }
  337. // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
  338. // yourself.
  339. func (cl *Client) AddListener(l Listener) {
  340. cl.listeners = append(cl.listeners, l)
  341. if cl.config.AcceptPeerConnections {
  342. go cl.acceptConnections(l)
  343. }
  344. }
  345. func (cl *Client) firewallCallback(net.Addr) bool {
  346. cl.rLock()
  347. block := !cl.wantConns() || !cl.config.AcceptPeerConnections
  348. cl.rUnlock()
  349. if block {
  350. torrent.Add("connections firewalled", 1)
  351. } else {
  352. torrent.Add("connections not firewalled", 1)
  353. }
  354. return block
  355. }
  356. func (cl *Client) listenOnNetwork(n network) bool {
  357. if n.Ipv4 && cl.config.DisableIPv4 {
  358. return false
  359. }
  360. if n.Ipv6 && cl.config.DisableIPv6 {
  361. return false
  362. }
  363. if n.Tcp && cl.config.DisableTCP {
  364. return false
  365. }
  366. if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
  367. return false
  368. }
  369. return true
  370. }
  371. func (cl *Client) listenNetworks() (ns []network) {
  372. for _, n := range allPeerNetworks {
  373. if cl.listenOnNetwork(n) {
  374. ns = append(ns, n)
  375. }
  376. }
  377. return
  378. }
  379. // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
  380. func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
  381. logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
  382. cfg := dht.ServerConfig{
  383. IPBlocklist: cl.ipBlockList,
  384. Conn: conn,
  385. OnAnnouncePeer: cl.onDHTAnnouncePeer,
  386. PublicIP: func() net.IP {
  387. if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
  388. return cl.config.PublicIp6
  389. }
  390. return cl.config.PublicIp4
  391. }(),
  392. StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
  393. OnQuery: cl.config.DHTOnQuery,
  394. Logger: logger,
  395. }
  396. if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
  397. f(&cfg)
  398. }
  399. s, err = dht.NewServer(&cfg)
  400. if err == nil {
  401. go s.TableMaintainer()
  402. }
  403. return
  404. }
  405. func (cl *Client) Closed() events.Done {
  406. return cl.closed.Done()
  407. }
  408. func (cl *Client) eachDhtServer(f func(DhtServer)) {
  409. for _, ds := range cl.dhtServers {
  410. f(ds)
  411. }
  412. }
  413. // Stops the client. All connections to peers are closed and all activity will come to a halt.
  414. func (cl *Client) Close() (errs []error) {
  415. var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
  416. cl.lock()
  417. for t := range cl.torrents {
  418. err := t.close(&closeGroup)
  419. if err != nil {
  420. errs = append(errs, err)
  421. }
  422. }
  423. cl.clearPortMappings()
  424. for i := range cl.onClose {
  425. cl.onClose[len(cl.onClose)-1-i]()
  426. }
  427. cl.closed.Set()
  428. cl.unlock()
  429. cl.event.Broadcast()
  430. closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
  431. return
  432. }
  433. func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
  434. if cl.ipBlockList == nil {
  435. return
  436. }
  437. return cl.ipBlockList.Lookup(ip)
  438. }
  439. func (cl *Client) ipIsBlocked(ip net.IP) bool {
  440. _, blocked := cl.ipBlockRange(ip)
  441. return blocked
  442. }
  443. func (cl *Client) wantConns() bool {
  444. if cl.config.AlwaysWantConns {
  445. return true
  446. }
  447. for t := range cl.torrents {
  448. if t.wantIncomingConns() {
  449. return true
  450. }
  451. }
  452. return false
  453. }
  454. // TODO: Apply filters for non-standard networks, particularly rate-limiting.
  455. func (cl *Client) rejectAccepted(conn net.Conn) error {
  456. if !cl.wantConns() {
  457. return errors.New("don't want conns right now")
  458. }
  459. ra := conn.RemoteAddr()
  460. if rip := addrIpOrNil(ra); rip != nil {
  461. if cl.config.DisableIPv4Peers && rip.To4() != nil {
  462. return errors.New("ipv4 peers disabled")
  463. }
  464. if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
  465. return errors.New("ipv4 disabled")
  466. }
  467. if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
  468. return errors.New("ipv6 disabled")
  469. }
  470. if cl.rateLimitAccept(rip) {
  471. return errors.New("source IP accepted rate limited")
  472. }
  473. if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
  474. return errors.New("bad source addr")
  475. }
  476. }
  477. return nil
  478. }
  479. func (cl *Client) acceptConnections(l Listener) {
  480. for {
  481. conn, err := l.Accept()
  482. torrent.Add("client listener accepts", 1)
  483. if err == nil {
  484. holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
  485. if holepunchErr == nil {
  486. cl.lock()
  487. if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
  488. setAdd(&cl.accepted, holepunchAddr)
  489. }
  490. if g.MapContains(
  491. cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
  492. holepunchAddr,
  493. ) {
  494. setAdd(&cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
  495. }
  496. cl.unlock()
  497. }
  498. }
  499. conn = pproffd.WrapNetConn(conn)
  500. cl.rLock()
  501. closed := cl.closed.IsSet()
  502. var reject error
  503. if !closed && conn != nil {
  504. reject = cl.rejectAccepted(conn)
  505. }
  506. cl.rUnlock()
  507. if closed {
  508. if conn != nil {
  509. conn.Close()
  510. }
  511. return
  512. }
  513. if err != nil {
  514. log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
  515. continue
  516. }
  517. go func() {
  518. if reject != nil {
  519. torrent.Add("rejected accepted connections", 1)
  520. cl.logger.LazyLog(log.Debug, func() log.Msg {
  521. return log.Fmsg("rejecting accepted conn: %v", reject)
  522. })
  523. conn.Close()
  524. } else {
  525. go cl.incomingConnection(conn)
  526. }
  527. cl.logger.LazyLog(log.Debug, func() log.Msg {
  528. return log.Fmsg("accepted %q connection at %q from %q",
  529. l.Addr().Network(),
  530. conn.LocalAddr(),
  531. conn.RemoteAddr(),
  532. )
  533. })
  534. torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
  535. torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
  536. torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
  537. }()
  538. }
  539. }
  540. // Creates the PeerConn.connString for a regular net.Conn PeerConn.
  541. func regularNetConnPeerConnConnString(nc net.Conn) string {
  542. return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
  543. }
  544. func (cl *Client) incomingConnection(nc net.Conn) {
  545. defer nc.Close()
  546. if tc, ok := nc.(*net.TCPConn); ok {
  547. tc.SetLinger(0)
  548. }
  549. remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
  550. c := cl.newConnection(
  551. nc,
  552. newConnectionOpts{
  553. outgoing: false,
  554. remoteAddr: nc.RemoteAddr(),
  555. localPublicAddr: cl.publicAddr(remoteAddr.IP),
  556. network: nc.RemoteAddr().Network(),
  557. connString: regularNetConnPeerConnConnString(nc),
  558. })
  559. c.Discovery = PeerSourceIncoming
  560. cl.runReceivedConn(c)
  561. cl.lock()
  562. c.close()
  563. cl.unlock()
  564. }
  565. // Returns a handle to the given torrent, if it's present in the client.
  566. func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
  567. cl.rLock()
  568. defer cl.rUnlock()
  569. t, ok = cl.torrentsByShortHash[ih]
  570. return
  571. }
  572. type DialResult struct {
  573. Conn net.Conn
  574. Dialer Dialer
  575. }
  576. func countDialResult(err error) {
  577. if err == nil {
  578. torrent.Add("successful dials", 1)
  579. } else {
  580. torrent.Add("unsuccessful dials", 1)
  581. }
  582. }
  583. func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
  584. ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
  585. if ret < minDialTimeout {
  586. ret = minDialTimeout
  587. }
  588. return
  589. }
  590. // Returns whether an address is known to connect to a client with our own ID.
  591. func (cl *Client) dopplegangerAddr(addr string) bool {
  592. _, ok := cl.dopplegangerAddrs[addr]
  593. return ok
  594. }
  595. // Returns a connection over UTP or TCP, whichever is first to connect.
  596. func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
  597. return DialFirst(ctx, addr, cl.dialers)
  598. }
  599. // Returns a connection over UTP or TCP, whichever is first to connect.
  600. func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
  601. pool := dialPool{
  602. addr: addr,
  603. }
  604. defer pool.startDrainer()
  605. for _, _s := range dialers {
  606. pool.add(ctx, _s)
  607. }
  608. return pool.getFirst()
  609. }
  610. func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
  611. c, err := s.Dial(ctx, addr)
  612. if err != nil {
  613. log.ContextLogger(ctx).Levelf(log.Debug, "error dialing %q: %v", addr, err)
  614. }
  615. // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
  616. // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
  617. // code to increase the chance it's done.
  618. if tc, ok := c.(*net.TCPConn); ok {
  619. tc.SetLinger(0)
  620. }
  621. countDialResult(err)
  622. return c
  623. }
  624. func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
  625. path := t.getHalfOpenPath(addr, attemptKey)
  626. if !path.Exists() {
  627. panic("should exist")
  628. }
  629. path.Delete()
  630. cl.numHalfOpen--
  631. if cl.numHalfOpen < 0 {
  632. panic("should not be possible")
  633. }
  634. for t := range cl.torrents {
  635. t.openNewConns()
  636. }
  637. }
  638. func (cl *Client) countHalfOpenFromTorrents() (count int) {
  639. for t := range cl.torrents {
  640. count += t.numHalfOpenAttempts()
  641. }
  642. return
  643. }
  644. // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
  645. // for valid reasons.
  646. func (cl *Client) initiateProtocolHandshakes(
  647. ctx context.Context,
  648. nc net.Conn,
  649. t *Torrent,
  650. encryptHeader bool,
  651. newConnOpts newConnectionOpts,
  652. ) (
  653. c *PeerConn, err error,
  654. ) {
  655. c = cl.newConnection(nc, newConnOpts)
  656. c.headerEncrypted = encryptHeader
  657. ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
  658. defer cancel()
  659. dl, ok := ctx.Deadline()
  660. if !ok {
  661. panic(ctx)
  662. }
  663. err = nc.SetDeadline(dl)
  664. if err != nil {
  665. panic(err)
  666. }
  667. err = cl.initiateHandshakes(ctx, c, t)
  668. return
  669. }
  670. func doProtocolHandshakeOnDialResult(
  671. t *Torrent,
  672. obfuscatedHeader bool,
  673. addr PeerRemoteAddr,
  674. dr DialResult,
  675. ) (
  676. c *PeerConn, err error,
  677. ) {
  678. cl := t.cl
  679. nc := dr.Conn
  680. addrIpPort, _ := tryIpPortFromNetAddr(addr)
  681. c, err = cl.initiateProtocolHandshakes(
  682. context.Background(), nc, t, obfuscatedHeader,
  683. newConnectionOpts{
  684. outgoing: true,
  685. remoteAddr: addr,
  686. // It would be possible to retrieve a public IP from the dialer used here?
  687. localPublicAddr: cl.publicAddr(addrIpPort.IP),
  688. network: dr.Dialer.DialerNetwork(),
  689. connString: regularNetConnPeerConnConnString(nc),
  690. })
  691. if err != nil {
  692. nc.Close()
  693. }
  694. return c, err
  695. }
  696. // Returns nil connection and nil error if no connection could be established for valid reasons.
  697. func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
  698. // It would be better if dial rate limiting could be tested when considering to open connections
  699. // instead. Doing it here means if the limit is low, and the half-open limit is high, we could
  700. // end up with lots of outgoing connection attempts pending that were initiated on stale data.
  701. {
  702. dialReservation := cl.config.DialRateLimiter.Reserve()
  703. if !opts.receivedHolepunchConnect {
  704. if !dialReservation.OK() {
  705. err = errors.New("can't make dial limit reservation")
  706. return
  707. }
  708. time.Sleep(dialReservation.Delay())
  709. }
  710. }
  711. torrent.Add("establish outgoing connection", 1)
  712. addr := opts.peerInfo.Addr
  713. dialPool := dialPool{
  714. resCh: make(chan DialResult),
  715. addr: addr.String(),
  716. }
  717. defer dialPool.startDrainer()
  718. dialTimeout := opts.t.getDialTimeoutUnlocked()
  719. {
  720. ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
  721. defer cancel()
  722. for _, d := range cl.dialers {
  723. dialPool.add(ctx, d)
  724. }
  725. }
  726. holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
  727. headerObfuscationPolicy := opts.HeaderObfuscationPolicy
  728. obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
  729. firstDialResult := dialPool.getFirst()
  730. if firstDialResult.Conn == nil {
  731. // No dialers worked. Try to initiate a holepunching rendezvous.
  732. if holepunchAddrErr == nil {
  733. cl.lock()
  734. if !opts.receivedHolepunchConnect {
  735. g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
  736. }
  737. if !opts.skipHolepunchRendezvous {
  738. opts.t.trySendHolepunchRendezvous(holepunchAddr)
  739. }
  740. cl.unlock()
  741. }
  742. err = fmt.Errorf("all initial dials failed")
  743. return
  744. }
  745. if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
  746. cl.lock()
  747. if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
  748. g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
  749. }
  750. g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
  751. g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
  752. cl.unlock()
  753. }
  754. c, err = doProtocolHandshakeOnDialResult(
  755. opts.t,
  756. obfuscatedHeaderFirst,
  757. addr,
  758. firstDialResult,
  759. )
  760. if err == nil {
  761. torrent.Add("initiated conn with preferred header obfuscation", 1)
  762. return
  763. }
  764. c.logger.Levelf(
  765. log.Debug,
  766. "error doing protocol handshake with header obfuscation %v",
  767. obfuscatedHeaderFirst,
  768. )
  769. firstDialResult.Conn.Close()
  770. // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
  771. if headerObfuscationPolicy.RequirePreferred {
  772. return
  773. }
  774. // Reuse the dialer that returned already but failed to handshake.
  775. {
  776. ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
  777. defer cancel()
  778. dialPool.add(ctx, firstDialResult.Dialer)
  779. }
  780. secondDialResult := dialPool.getFirst()
  781. if secondDialResult.Conn == nil {
  782. return
  783. }
  784. c, err = doProtocolHandshakeOnDialResult(
  785. opts.t,
  786. !obfuscatedHeaderFirst,
  787. addr,
  788. secondDialResult,
  789. )
  790. if err == nil {
  791. torrent.Add("initiated conn with fallback header obfuscation", 1)
  792. return
  793. }
  794. c.logger.Levelf(
  795. log.Debug,
  796. "error doing protocol handshake with header obfuscation %v",
  797. !obfuscatedHeaderFirst,
  798. )
  799. secondDialResult.Conn.Close()
  800. return
  801. }
  802. type outgoingConnOpts struct {
  803. peerInfo PeerInfo
  804. t *Torrent
  805. // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
  806. requireRendezvous bool
  807. // Don't send rendezvous requests to eligible relays.
  808. skipHolepunchRendezvous bool
  809. // Outgoing connection attempt is in response to holepunch connect message.
  810. receivedHolepunchConnect bool
  811. HeaderObfuscationPolicy HeaderObfuscationPolicy
  812. }
  813. // Called to dial out and run a connection. The addr we're given is already
  814. // considered half-open.
  815. func (cl *Client) outgoingConnection(
  816. opts outgoingConnOpts,
  817. attemptKey outgoingConnAttemptKey,
  818. ) {
  819. c, err := cl.dialAndCompleteHandshake(opts)
  820. if err == nil {
  821. c.conn.SetWriteDeadline(time.Time{})
  822. }
  823. cl.lock()
  824. defer cl.unlock()
  825. // Don't release lock between here and addPeerConn, unless it's for failure.
  826. cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
  827. if err != nil {
  828. if cl.config.Debug {
  829. cl.logger.Levelf(
  830. log.Debug,
  831. "error establishing outgoing connection to %v: %v",
  832. opts.peerInfo.Addr,
  833. err,
  834. )
  835. }
  836. return
  837. }
  838. defer c.close()
  839. c.Discovery = opts.peerInfo.Source
  840. c.trusted = opts.peerInfo.Trusted
  841. opts.t.runHandshookConnLoggingErr(c)
  842. }
  843. // The port number for incoming peer connections. 0 if the client isn't listening.
  844. func (cl *Client) incomingPeerPort() int {
  845. return cl.LocalPort()
  846. }
  847. func (cl *Client) initiateHandshakes(ctx context.Context, c *PeerConn, t *Torrent) (err error) {
  848. if c.headerEncrypted {
  849. var rw io.ReadWriter
  850. rw, c.cryptoMethod, err = mse.InitiateHandshakeContext(
  851. ctx,
  852. struct {
  853. io.Reader
  854. io.Writer
  855. }{c.r, c.w},
  856. t.canonicalShortInfohash().Bytes(),
  857. nil,
  858. cl.config.CryptoProvides,
  859. )
  860. c.setRW(rw)
  861. if err != nil {
  862. return fmt.Errorf("header obfuscation handshake: %w", err)
  863. }
  864. }
  865. localReservedBits := cl.config.Extensions
  866. handshakeIh := *t.canonicalShortInfohash()
  867. // If we're sending the v1 infohash, and we know the v2 infohash, set the v2 upgrade bit. This
  868. // means the peer can send the v2 infohash in the handshake to upgrade the connection.
  869. localReservedBits.SetBit(pp.ExtensionBitV2Upgrade, g.Some(handshakeIh) == t.infoHash && t.infoHashV2.Ok)
  870. ih, err := cl.connBtHandshake(context.TODO(), c, &handshakeIh, localReservedBits)
  871. if err != nil {
  872. return fmt.Errorf("bittorrent protocol handshake: %w", err)
  873. }
  874. if g.Some(ih) == t.infoHash {
  875. return nil
  876. }
  877. if t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih {
  878. torrent.Add("initiated handshakes upgraded to v2", 1)
  879. c.v2 = true
  880. return nil
  881. }
  882. err = errors.New("bittorrent protocol handshake: peer infohash didn't match")
  883. return
  884. }
  885. // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
  886. // that won't also try to take the lock. This saves us copying all the infohashes everytime.
  887. func (cl *Client) forSkeys(f func([]byte) bool) {
  888. cl.rLock()
  889. defer cl.rUnlock()
  890. if false { // Emulate the bug from #114
  891. var firstIh InfoHash
  892. for ih := range cl.torrentsByShortHash {
  893. firstIh = ih
  894. break
  895. }
  896. for range cl.torrentsByShortHash {
  897. if !f(firstIh[:]) {
  898. break
  899. }
  900. }
  901. return
  902. }
  903. for ih := range cl.torrentsByShortHash {
  904. if !f(ih[:]) {
  905. break
  906. }
  907. }
  908. }
  909. func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
  910. if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
  911. return ret
  912. }
  913. return cl.forSkeys
  914. }
  915. // Do encryption and bittorrent handshakes as receiver.
  916. func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
  917. defer perf.ScopeTimerErr(&err)()
  918. var rw io.ReadWriter
  919. rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(
  920. c.rw(),
  921. cl.handshakeReceiverSecretKeys(),
  922. cl.config.HeaderObfuscationPolicy,
  923. cl.config.CryptoSelector,
  924. )
  925. c.setRW(rw)
  926. if err == nil || err == mse.ErrNoSecretKeyMatch {
  927. if c.headerEncrypted {
  928. torrent.Add("handshakes received encrypted", 1)
  929. } else {
  930. torrent.Add("handshakes received unencrypted", 1)
  931. }
  932. } else {
  933. torrent.Add("handshakes received with error while handling encryption", 1)
  934. }
  935. if err != nil {
  936. if err == mse.ErrNoSecretKeyMatch {
  937. err = nil
  938. }
  939. return
  940. }
  941. if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
  942. err = errors.New("connection does not have required header obfuscation")
  943. return
  944. }
  945. ih, err := cl.connBtHandshake(context.TODO(), c, nil, cl.config.Extensions)
  946. if err != nil {
  947. return nil, fmt.Errorf("during bt handshake: %w", err)
  948. }
  949. cl.lock()
  950. t = cl.torrentsByShortHash[ih]
  951. if t != nil && t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih {
  952. torrent.Add("v2 handshakes received", 1)
  953. c.v2 = true
  954. }
  955. cl.unlock()
  956. return
  957. }
  958. var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
  959. func init() {
  960. torrent.Set(
  961. "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
  962. &successfulPeerWireProtocolHandshakePeerReservedBytes)
  963. }
  964. func (cl *Client) connBtHandshake(ctx context.Context, c *PeerConn, ih *metainfo.Hash, reservedBits PeerExtensionBits) (ret metainfo.Hash, err error) {
  965. res, err := pp.Handshake(ctx, c.rw(), ih, cl.peerID, reservedBits)
  966. if err != nil {
  967. return
  968. }
  969. successfulPeerWireProtocolHandshakePeerReservedBytes.Add(
  970. hex.EncodeToString(res.PeerExtensionBits[:]), 1)
  971. ret = res.Hash
  972. c.PeerExtensionBytes = res.PeerExtensionBits
  973. c.PeerID = res.PeerID
  974. c.completedHandshake = time.Now()
  975. if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
  976. cb(c, res.Hash)
  977. }
  978. return
  979. }
  980. func (cl *Client) runReceivedConn(c *PeerConn) {
  981. err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
  982. if err != nil {
  983. panic(err)
  984. }
  985. t, err := cl.receiveHandshakes(c)
  986. if err != nil {
  987. cl.logger.LazyLog(log.Debug, func() log.Msg {
  988. return log.Fmsg(
  989. "error receiving handshakes on %v: %s", c, err,
  990. ).Add(
  991. "network", c.Network,
  992. )
  993. })
  994. torrent.Add("error receiving handshake", 1)
  995. cl.lock()
  996. cl.onBadAccept(c.RemoteAddr)
  997. cl.unlock()
  998. return
  999. }
  1000. if t == nil {
  1001. torrent.Add("received handshake for unloaded torrent", 1)
  1002. cl.logger.LazyLog(log.Debug, func() log.Msg {
  1003. return log.Fmsg("received handshake for unloaded torrent")
  1004. })
  1005. cl.lock()
  1006. cl.onBadAccept(c.RemoteAddr)
  1007. cl.unlock()
  1008. return
  1009. }
  1010. torrent.Add("received handshake for loaded torrent", 1)
  1011. c.conn.SetWriteDeadline(time.Time{})
  1012. cl.lock()
  1013. defer cl.unlock()
  1014. t.runHandshookConnLoggingErr(c)
  1015. }
  1016. // Client lock must be held before entering this.
  1017. func (t *Torrent) runHandshookConn(pc *PeerConn) error {
  1018. pc.setTorrent(t)
  1019. cl := t.cl
  1020. for i, b := range cl.config.MinPeerExtensions {
  1021. if pc.PeerExtensionBytes[i]&b != b {
  1022. return fmt.Errorf("peer did not meet minimum peer extensions: %x", pc.PeerExtensionBytes[:])
  1023. }
  1024. }
  1025. if pc.PeerID == cl.peerID {
  1026. if pc.outgoing {
  1027. connsToSelf.Add(1)
  1028. addr := pc.RemoteAddr.String()
  1029. cl.dopplegangerAddrs[addr] = struct{}{}
  1030. } /* else {
  1031. // Because the remote address is not necessarily the same as its client's torrent listen
  1032. // address, we won't record the remote address as a doppleganger. Instead, the initiator
  1033. // can record *us* as the doppleganger.
  1034. } */
  1035. t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
  1036. return nil
  1037. }
  1038. pc.r = deadlineReader{pc.conn, pc.r}
  1039. completedHandshakeConnectionFlags.Add(pc.connectionFlags(), 1)
  1040. if connIsIpv6(pc.conn) {
  1041. torrent.Add("completed handshake over ipv6", 1)
  1042. }
  1043. if err := t.addPeerConn(pc); err != nil {
  1044. return fmt.Errorf("adding connection: %w", err)
  1045. }
  1046. defer t.dropConnection(pc)
  1047. pc.addBuiltinLtepProtocols(!cl.config.DisablePEX)
  1048. for _, cb := range pc.callbacks.PeerConnAdded {
  1049. cb(pc)
  1050. }
  1051. pc.startMessageWriter()
  1052. pc.sendInitialMessages()
  1053. pc.initUpdateRequestsTimer()
  1054. err := pc.mainReadLoop()
  1055. if err != nil {
  1056. return fmt.Errorf("main read loop: %w", err)
  1057. }
  1058. return nil
  1059. }
  1060. func (p *Peer) initUpdateRequestsTimer() {
  1061. if check.Enabled {
  1062. if p.updateRequestsTimer != nil {
  1063. panic(p.updateRequestsTimer)
  1064. }
  1065. }
  1066. if enableUpdateRequestsTimer {
  1067. p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
  1068. }
  1069. }
  1070. const peerUpdateRequestsTimerReason = "updateRequestsTimer"
  1071. func (c *Peer) updateRequestsTimerFunc() {
  1072. c.locker().Lock()
  1073. defer c.locker().Unlock()
  1074. if c.closed.IsSet() {
  1075. return
  1076. }
  1077. if c.isLowOnRequests() {
  1078. // If there are no outstanding requests, then a request update should have already run.
  1079. return
  1080. }
  1081. if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
  1082. // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
  1083. // already been fired.
  1084. torrent.Add("spurious timer requests updates", 1)
  1085. return
  1086. }
  1087. c.updateRequests(peerUpdateRequestsTimerReason)
  1088. }
  1089. // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
  1090. // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
  1091. // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
  1092. const localClientReqq = 1024
  1093. // See the order given in Transmission's tr_peerMsgsNew.
  1094. func (pc *PeerConn) sendInitialMessages() {
  1095. t := pc.t
  1096. cl := t.cl
  1097. if pc.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
  1098. pc.write(pp.Message{
  1099. Type: pp.Extended,
  1100. ExtendedID: pp.HandshakeExtendedID,
  1101. ExtendedPayload: func() []byte {
  1102. msg := pp.ExtendedHandshakeMessage{
  1103. V: cl.config.ExtendedHandshakeClientVersion,
  1104. Reqq: localClientReqq,
  1105. YourIp: pp.CompactIp(pc.remoteIp()),
  1106. Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
  1107. Port: cl.incomingPeerPort(),
  1108. MetadataSize: t.metadataSize(),
  1109. // TODO: We can figure these out specific to the socket used.
  1110. Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
  1111. Ipv6: cl.config.PublicIp6.To16(),
  1112. }
  1113. msg.M = pc.LocalLtepProtocolMap.toSupportedExtensionDict()
  1114. return bencode.MustMarshal(msg)
  1115. }(),
  1116. })
  1117. }
  1118. func() {
  1119. if pc.fastEnabled() {
  1120. if t.haveAllPieces() {
  1121. pc.write(pp.Message{Type: pp.HaveAll})
  1122. pc.sentHaves.AddRange(0, bitmap.BitRange(pc.t.NumPieces()))
  1123. return
  1124. } else if !t.haveAnyPieces() {
  1125. pc.write(pp.Message{Type: pp.HaveNone})
  1126. pc.sentHaves.Clear()
  1127. return
  1128. }
  1129. }
  1130. pc.postBitfield()
  1131. }()
  1132. if pc.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
  1133. pc.write(pp.Message{
  1134. Type: pp.Port,
  1135. Port: cl.dhtPort(),
  1136. })
  1137. }
  1138. }
  1139. func (cl *Client) dhtPort() (ret uint16) {
  1140. if len(cl.dhtServers) == 0 {
  1141. return
  1142. }
  1143. return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
  1144. }
  1145. func (cl *Client) haveDhtServer() bool {
  1146. return len(cl.dhtServers) > 0
  1147. }
  1148. // Process incoming ut_metadata message.
  1149. func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
  1150. var d pp.ExtendedMetadataRequestMsg
  1151. err := bencode.Unmarshal(payload, &d)
  1152. if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
  1153. } else if err != nil {
  1154. return fmt.Errorf("error unmarshalling bencode: %s", err)
  1155. }
  1156. piece := d.Piece
  1157. switch d.Type {
  1158. case pp.DataMetadataExtensionMsgType:
  1159. c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
  1160. if !c.requestedMetadataPiece(piece) {
  1161. return fmt.Errorf("got unexpected piece %d", piece)
  1162. }
  1163. c.metadataRequests[piece] = false
  1164. begin := len(payload) - d.PieceSize()
  1165. if begin < 0 || begin >= len(payload) {
  1166. return fmt.Errorf("data has bad offset in payload: %d", begin)
  1167. }
  1168. t.saveMetadataPiece(piece, payload[begin:])
  1169. c.lastUsefulChunkReceived = time.Now()
  1170. err = t.maybeCompleteMetadata()
  1171. if err != nil {
  1172. // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
  1173. // don't know who to blame. TODO: Also errors can be returned here that aren't related
  1174. // to verifying metadata, which should be fixed. This should be tagged with metadata, so
  1175. // log consumers can filter for this message.
  1176. t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
  1177. }
  1178. return err
  1179. case pp.RequestMetadataExtensionMsgType:
  1180. if !t.haveMetadataPiece(piece) {
  1181. c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
  1182. return nil
  1183. }
  1184. start := (1 << 14) * piece
  1185. c.protocolLogger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
  1186. c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
  1187. return nil
  1188. case pp.RejectMetadataExtensionMsgType:
  1189. return nil
  1190. default:
  1191. return errors.New("unknown msg_type value")
  1192. }
  1193. }
  1194. func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
  1195. if ipa, ok := tryIpPortFromNetAddr(addr); ok {
  1196. return cl.badPeerIPPort(ipa.IP, ipa.Port)
  1197. }
  1198. return false
  1199. }
  1200. // Returns whether the IP address and port are considered "bad".
  1201. func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
  1202. if port == 0 || ip == nil {
  1203. return true
  1204. }
  1205. if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
  1206. return true
  1207. }
  1208. if _, ok := cl.ipBlockRange(ip); ok {
  1209. return true
  1210. }
  1211. ipAddr, ok := netip.AddrFromSlice(ip)
  1212. if !ok {
  1213. panic(ip)
  1214. }
  1215. if _, ok := cl.badPeerIPs[ipAddr]; ok {
  1216. return true
  1217. }
  1218. return false
  1219. }
  1220. // Return a Torrent ready for insertion into a Client.
  1221. func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
  1222. return cl.newTorrentOpt(AddTorrentOpts{
  1223. InfoHash: ih,
  1224. Storage: specStorage,
  1225. })
  1226. }
  1227. // Return a Torrent ready for insertion into a Client.
  1228. func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
  1229. var v1InfoHash g.Option[infohash.T]
  1230. if !opts.InfoHash.IsZero() {
  1231. v1InfoHash.Set(opts.InfoHash)
  1232. }
  1233. if !v1InfoHash.Ok && !opts.InfoHashV2.Ok {
  1234. panic("v1 infohash must be nonzero or v2 infohash must be set")
  1235. }
  1236. // use provided storage, if provided
  1237. storageClient := cl.defaultStorage
  1238. if opts.Storage != nil {
  1239. storageClient = storage.NewClient(opts.Storage)
  1240. }
  1241. t = &Torrent{
  1242. cl: cl,
  1243. infoHash: v1InfoHash,
  1244. infoHashV2: opts.InfoHashV2,
  1245. peers: prioritizedPeers{
  1246. om: gbtree.New(32),
  1247. getPrio: func(p PeerInfo) peerPriority {
  1248. ipPort := p.addr()
  1249. return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
  1250. },
  1251. },
  1252. conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
  1253. storageOpener: storageClient,
  1254. maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
  1255. metadataChanged: sync.Cond{
  1256. L: cl.locker(),
  1257. },
  1258. webSeeds: make(map[string]*Peer),
  1259. gotMetainfoC: make(chan struct{}),
  1260. }
  1261. var salt [8]byte
  1262. rand.Read(salt[:])
  1263. t.smartBanCache.Hash = func(b []byte) uint64 {
  1264. h := xxhash.New()
  1265. h.Write(salt[:])
  1266. h.Write(b)
  1267. return h.Sum64()
  1268. }
  1269. t.smartBanCache.Init()
  1270. t.networkingEnabled.Set()
  1271. t.logger = cl.logger.WithDefaultLevel(log.Debug)
  1272. t.sourcesLogger = t.logger.WithNames("sources")
  1273. if opts.ChunkSize == 0 {
  1274. opts.ChunkSize = defaultChunkSize
  1275. }
  1276. t.setChunkSize(opts.ChunkSize)
  1277. return
  1278. }
  1279. // A file-like handle to some torrent data resource.
  1280. type Handle interface {
  1281. io.Reader
  1282. io.Seeker
  1283. io.Closer
  1284. io.ReaderAt
  1285. }
  1286. func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
  1287. return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
  1288. }
  1289. // Deprecated. Adds a torrent by InfoHash with a custom Storage implementation.
  1290. // If the torrent already exists then this Storage is ignored and the
  1291. // existing torrent returned with `new` set to `false`
  1292. func (cl *Client) AddTorrentInfoHashWithStorage(
  1293. infoHash metainfo.Hash,
  1294. specStorage storage.ClientImpl,
  1295. ) (t *Torrent, new bool) {
  1296. cl.lock()
  1297. defer cl.unlock()
  1298. t, ok := cl.torrentsByShortHash[infoHash]
  1299. if ok {
  1300. return
  1301. }
  1302. new = true
  1303. t = cl.newTorrent(infoHash, specStorage)
  1304. cl.eachDhtServer(func(s DhtServer) {
  1305. if cl.config.PeriodicallyAnnounceTorrentsToDht {
  1306. go t.dhtAnnouncer(s)
  1307. }
  1308. })
  1309. cl.torrentsByShortHash[infoHash] = t
  1310. cl.torrents[t] = struct{}{}
  1311. cl.clearAcceptLimits()
  1312. t.updateWantPeersEvent()
  1313. // Tickle Client.waitAccept, new torrent may want conns.
  1314. cl.event.Broadcast()
  1315. return
  1316. }
  1317. // Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
  1318. // then this Storage is ignored and the existing torrent returned with `new` set to `false`.
  1319. func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
  1320. infoHash := opts.InfoHash
  1321. cl.lock()
  1322. defer cl.unlock()
  1323. t, ok := cl.torrentsByShortHash[infoHash]
  1324. if ok {
  1325. return
  1326. }
  1327. if opts.InfoHashV2.Ok {
  1328. t, ok = cl.torrentsByShortHash[*opts.InfoHashV2.Value.ToShort()]
  1329. if ok {
  1330. return
  1331. }
  1332. }
  1333. new = true
  1334. t = cl.newTorrentOpt(opts)
  1335. cl.eachDhtServer(func(s DhtServer) {
  1336. if cl.config.PeriodicallyAnnounceTorrentsToDht {
  1337. go t.dhtAnnouncer(s)
  1338. }
  1339. })
  1340. cl.torrentsByShortHash[infoHash] = t
  1341. cl.torrents[t] = struct{}{}
  1342. t.setInfoBytesLocked(opts.InfoBytes)
  1343. cl.clearAcceptLimits()
  1344. t.updateWantPeersEvent()
  1345. // Tickle Client.waitAccept, new torrent may want conns.
  1346. cl.event.Broadcast()
  1347. return
  1348. }
  1349. type AddTorrentOpts struct {
  1350. InfoHash infohash.T
  1351. InfoHashV2 g.Option[infohash_v2.T]
  1352. Storage storage.ClientImpl
  1353. ChunkSize pp.Integer
  1354. InfoBytes []byte
  1355. }
  1356. // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
  1357. // Torrent.MergeSpec.
  1358. func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
  1359. t, new = cl.AddTorrentOpt(AddTorrentOpts{
  1360. InfoHash: spec.InfoHash,
  1361. InfoHashV2: spec.InfoHashV2,
  1362. Storage: spec.Storage,
  1363. ChunkSize: spec.ChunkSize,
  1364. })
  1365. modSpec := *spec
  1366. if new {
  1367. // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
  1368. // it.
  1369. modSpec.ChunkSize = 0
  1370. }
  1371. err = t.MergeSpec(&modSpec)
  1372. if err != nil && new {
  1373. t.Drop()
  1374. }
  1375. return
  1376. }
  1377. // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
  1378. // spec.DisallowDataDownload/Upload will be read and applied
  1379. // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
  1380. func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
  1381. if spec.DisplayName != "" {
  1382. t.SetDisplayName(spec.DisplayName)
  1383. }
  1384. if spec.InfoBytes != nil {
  1385. err := t.SetInfoBytes(spec.InfoBytes)
  1386. if err != nil {
  1387. return err
  1388. }
  1389. }
  1390. cl := t.cl
  1391. cl.AddDhtNodes(spec.DhtNodes)
  1392. t.UseSources(spec.Sources)
  1393. cl.lock()
  1394. defer cl.unlock()
  1395. t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
  1396. for _, url := range spec.Webseeds {
  1397. t.addWebSeed(url)
  1398. }
  1399. for _, peerAddr := range spec.PeerAddrs {
  1400. t.addPeer(PeerInfo{
  1401. Addr: StringAddr(peerAddr),
  1402. Source: PeerSourceDirect,
  1403. Trusted: true,
  1404. })
  1405. }
  1406. if spec.ChunkSize != 0 {
  1407. panic("chunk size cannot be changed for existing Torrent")
  1408. }
  1409. t.addTrackers(spec.Trackers)
  1410. t.maybeNewConns()
  1411. t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
  1412. t.dataUploadDisallowed = spec.DisallowDataUpload
  1413. return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...)
  1414. }
  1415. func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) {
  1416. t.eachShortInfohash(func(short [20]byte) {
  1417. delete(cl.torrentsByShortHash, short)
  1418. })
  1419. err = t.close(wg)
  1420. delete(cl.torrents, t)
  1421. return
  1422. }
  1423. func (cl *Client) allTorrentsCompleted() bool {
  1424. for t := range cl.torrents {
  1425. if !t.haveInfo() {
  1426. return false
  1427. }
  1428. if !t.haveAllPieces() {
  1429. return false
  1430. }
  1431. }
  1432. return true
  1433. }
  1434. // Returns true when all torrents are completely downloaded and false if the
  1435. // client is stopped before that.
  1436. func (cl *Client) WaitAll() bool {
  1437. cl.lock()
  1438. defer cl.unlock()
  1439. for !cl.allTorrentsCompleted() {
  1440. if cl.closed.IsSet() {
  1441. return false
  1442. }
  1443. cl.event.Wait()
  1444. }
  1445. return true
  1446. }
  1447. // Returns handles to all the torrents loaded in the Client.
  1448. func (cl *Client) Torrents() []*Torrent {
  1449. cl.rLock()
  1450. defer cl.rUnlock()
  1451. return cl.torrentsAsSlice()
  1452. }
  1453. func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
  1454. for t := range cl.torrents {
  1455. ret = append(ret, t)
  1456. }
  1457. return
  1458. }
  1459. func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
  1460. spec, err := TorrentSpecFromMagnetUri(uri)
  1461. if err != nil {
  1462. return
  1463. }
  1464. T, _, err = cl.AddTorrentSpec(spec)
  1465. return
  1466. }
  1467. func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
  1468. ts, err := TorrentSpecFromMetaInfoErr(mi)
  1469. if err != nil {
  1470. return
  1471. }
  1472. T, _, err = cl.AddTorrentSpec(ts)
  1473. return
  1474. }
  1475. func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
  1476. mi, err := metainfo.LoadFromFile(filename)
  1477. if err != nil {
  1478. return
  1479. }
  1480. return cl.AddTorrent(mi)
  1481. }
  1482. func (cl *Client) DhtServers() []DhtServer {
  1483. return cl.dhtServers
  1484. }
  1485. func (cl *Client) AddDhtNodes(nodes []string) {
  1486. for _, n := range nodes {
  1487. hmp := missinggo.SplitHostMaybePort(n)
  1488. ip := net.ParseIP(hmp.Host)
  1489. if ip == nil {
  1490. cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
  1491. continue
  1492. }
  1493. ni := krpc.NodeInfo{
  1494. Addr: krpc.NodeAddr{
  1495. IP: ip,
  1496. Port: hmp.Port,
  1497. },
  1498. }
  1499. cl.eachDhtServer(func(s DhtServer) {
  1500. s.AddNode(ni)
  1501. })
  1502. }
  1503. }
  1504. func (cl *Client) banPeerIP(ip net.IP) {
  1505. // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
  1506. // addresses directly to v4on6, which doesn't compare equal with v4.
  1507. ipAddr, ok := netip.AddrFromSlice(ip)
  1508. if !ok {
  1509. panic(ip)
  1510. }
  1511. g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
  1512. for t := range cl.torrents {
  1513. t.iterPeers(func(p *Peer) {
  1514. if p.remoteIp().Equal(ip) {
  1515. t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
  1516. // Should this be a close?
  1517. p.drop()
  1518. }
  1519. })
  1520. }
  1521. }
  1522. type newConnectionOpts struct {
  1523. outgoing bool
  1524. remoteAddr PeerRemoteAddr
  1525. localPublicAddr peerLocalPublicAddr
  1526. network string
  1527. connString string
  1528. }
  1529. func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
  1530. if opts.network == "" {
  1531. panic(opts.remoteAddr)
  1532. }
  1533. c = &PeerConn{
  1534. Peer: Peer{
  1535. outgoing: opts.outgoing,
  1536. choking: true,
  1537. peerChoking: true,
  1538. PeerMaxRequests: 250,
  1539. RemoteAddr: opts.remoteAddr,
  1540. localPublicAddr: opts.localPublicAddr,
  1541. Network: opts.network,
  1542. callbacks: &cl.config.Callbacks,
  1543. },
  1544. connString: opts.connString,
  1545. conn: nc,
  1546. }
  1547. c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
  1548. c.initRequestState()
  1549. // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
  1550. if opts.remoteAddr != nil {
  1551. netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
  1552. if err == nil {
  1553. c.bannableAddr = Some(netipAddrPort.Addr())
  1554. }
  1555. }
  1556. c.peerImpl = c
  1557. c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextText(fmt.Sprintf("%T %p", c, c))
  1558. c.protocolLogger = c.logger.WithNames(protocolLoggingName)
  1559. c.setRW(connStatsReadWriter{nc, c})
  1560. c.r = &rateLimitedReader{
  1561. l: cl.config.DownloadRateLimiter,
  1562. r: c.r,
  1563. }
  1564. c.logger.Levelf(
  1565. log.Debug,
  1566. "inited with remoteAddr %v network %v outgoing %t",
  1567. opts.remoteAddr, opts.network, opts.outgoing,
  1568. )
  1569. for _, f := range cl.config.Callbacks.NewPeer {
  1570. f(&c.Peer)
  1571. }
  1572. return
  1573. }
  1574. func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
  1575. cl.lock()
  1576. defer cl.unlock()
  1577. t := cl.torrentsByShortHash[ih]
  1578. if t == nil {
  1579. return
  1580. }
  1581. t.addPeers([]PeerInfo{{
  1582. Addr: ipPortAddr{ip, port},
  1583. Source: PeerSourceDhtAnnouncePeer,
  1584. }})
  1585. }
  1586. func firstNotNil(ips ...net.IP) net.IP {
  1587. for _, ip := range ips {
  1588. if ip != nil {
  1589. return ip
  1590. }
  1591. }
  1592. return nil
  1593. }
  1594. func (cl *Client) eachListener(f func(Listener) bool) {
  1595. for _, s := range cl.listeners {
  1596. if !f(s) {
  1597. break
  1598. }
  1599. }
  1600. }
  1601. func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
  1602. for i := 0; i < len(cl.listeners); i += 1 {
  1603. if ret = cl.listeners[i]; f(ret) {
  1604. return
  1605. }
  1606. }
  1607. return nil
  1608. }
  1609. func (cl *Client) publicIp(peer net.IP) net.IP {
  1610. // TODO: Use BEP 10 to determine how peers are seeing us.
  1611. if peer.To4() != nil {
  1612. return firstNotNil(
  1613. cl.config.PublicIp4,
  1614. cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
  1615. )
  1616. }
  1617. return firstNotNil(
  1618. cl.config.PublicIp6,
  1619. cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
  1620. )
  1621. }
  1622. func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
  1623. l := cl.findListener(
  1624. func(l Listener) bool {
  1625. return f(addrIpOrNil(l.Addr()))
  1626. },
  1627. )
  1628. if l == nil {
  1629. return nil
  1630. }
  1631. return addrIpOrNil(l.Addr())
  1632. }
  1633. // Our IP as a peer should see it.
  1634. func (cl *Client) publicAddr(peer net.IP) IpPort {
  1635. return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
  1636. }
  1637. // ListenAddrs addresses currently being listened to.
  1638. func (cl *Client) ListenAddrs() (ret []net.Addr) {
  1639. cl.lock()
  1640. ret = make([]net.Addr, len(cl.listeners))
  1641. for i := 0; i < len(cl.listeners); i += 1 {
  1642. ret[i] = cl.listeners[i].Addr()
  1643. }
  1644. cl.unlock()
  1645. return
  1646. }
  1647. func (cl *Client) PublicIPs() (ips []net.IP) {
  1648. if ip := cl.config.PublicIp4; ip != nil {
  1649. ips = append(ips, ip)
  1650. }
  1651. if ip := cl.config.PublicIp6; ip != nil {
  1652. ips = append(ips, ip)
  1653. }
  1654. return
  1655. }
  1656. func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
  1657. ipa, ok := tryIpPortFromNetAddr(addr)
  1658. if !ok {
  1659. return
  1660. }
  1661. ip := maskIpForAcceptLimiting(ipa.IP)
  1662. if cl.acceptLimiter == nil {
  1663. cl.acceptLimiter = make(map[ipStr]int)
  1664. }
  1665. cl.acceptLimiter[ipStr(ip.String())]++
  1666. }
  1667. func maskIpForAcceptLimiting(ip net.IP) net.IP {
  1668. if ip4 := ip.To4(); ip4 != nil {
  1669. return ip4.Mask(net.CIDRMask(24, 32))
  1670. }
  1671. return ip
  1672. }
  1673. func (cl *Client) clearAcceptLimits() {
  1674. cl.acceptLimiter = nil
  1675. }
  1676. func (cl *Client) acceptLimitClearer() {
  1677. for {
  1678. select {
  1679. case <-cl.closed.Done():
  1680. return
  1681. case <-time.After(15 * time.Minute):
  1682. cl.lock()
  1683. cl.clearAcceptLimits()
  1684. cl.unlock()
  1685. }
  1686. }
  1687. }
  1688. func (cl *Client) rateLimitAccept(ip net.IP) bool {
  1689. if cl.config.DisableAcceptRateLimiting {
  1690. return false
  1691. }
  1692. return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
  1693. }
  1694. func (cl *Client) rLock() {
  1695. cl._mu.RLock()
  1696. }
  1697. func (cl *Client) rUnlock() {
  1698. cl._mu.RUnlock()
  1699. }
  1700. func (cl *Client) lock() {
  1701. cl._mu.Lock()
  1702. }
  1703. func (cl *Client) unlock() {
  1704. cl._mu.Unlock()
  1705. }
  1706. func (cl *Client) locker() *lockWithDeferreds {
  1707. return &cl._mu
  1708. }
  1709. func (cl *Client) String() string {
  1710. return fmt.Sprintf("<%[1]T %[1]p>", cl)
  1711. }
  1712. func (cl *Client) ICEServers() []webrtc.ICEServer {
  1713. var ICEServers []webrtc.ICEServer
  1714. if cl.config.ICEServerList != nil {
  1715. ICEServers = cl.config.ICEServerList
  1716. } else if cl.config.ICEServers != nil {
  1717. ICEServers = []webrtc.ICEServer{{URLs: cl.config.ICEServers}}
  1718. }
  1719. return ICEServers
  1720. }
  1721. // Returns connection-level aggregate connStats at the Client level. See the comment on
  1722. // TorrentStats.ConnStats.
  1723. func (cl *Client) ConnStats() ConnStats {
  1724. return cl.connStats.Copy()
  1725. }
  1726. func (cl *Client) Stats() ClientStats {
  1727. cl.rLock()
  1728. defer cl.rUnlock()
  1729. return cl.statsLocked()
  1730. }