server.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477
  1. package dht
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "runtime/pprof"
  11. "strings"
  12. "text/tabwriter"
  13. "time"
  14. "github.com/anacrolix/log"
  15. "github.com/anacrolix/missinggo/v2"
  16. "github.com/anacrolix/sync"
  17. "github.com/anacrolix/torrent/bencode"
  18. "github.com/anacrolix/torrent/iplist"
  19. "github.com/anacrolix/torrent/logonce"
  20. "github.com/anacrolix/torrent/metainfo"
  21. "golang.org/x/time/rate"
  22. "github.com/anacrolix/dht/v2/bep44"
  23. "github.com/anacrolix/dht/v2/int160"
  24. "github.com/anacrolix/dht/v2/krpc"
  25. peer_store "github.com/anacrolix/dht/v2/peer-store"
  26. "github.com/anacrolix/dht/v2/traversal"
  27. "github.com/anacrolix/dht/v2/types"
  28. )
  29. // A Server defines parameters for a DHT node server that is able to send
  30. // queries, and respond to the ones from the network. Each node has a globally
  31. // unique identifier known as the "node ID." Node IDs are chosen at random
  32. // from the same 160-bit space as BitTorrent infohashes and define the
  33. // behaviour of the node. Zero valued Server does not have a valid ID and thus
  34. // is unable to function properly. Use `NewServer(nil)` to initialize a
  35. // default node.
  36. type Server struct {
  37. id int160.T
  38. socket net.PacketConn
  39. resendDelay func() time.Duration
  40. mu sync.RWMutex
  41. transactions map[transactionKey]*Transaction
  42. nextT uint64 // unique "t" field for outbound queries
  43. table table
  44. closed missinggo.Event
  45. ipBlockList iplist.Ranger
  46. tokenServer tokenServer // Manages tokens we issue to our queriers.
  47. config ServerConfig
  48. stats ServerStats
  49. sendLimit *rate.Limiter
  50. lastBootstrap time.Time
  51. bootstrappingNow bool
  52. store *bep44.Wrapper
  53. }
  54. func (s *Server) numGoodNodes() (num int) {
  55. s.table.forNodes(func(n *node) bool {
  56. if s.IsGood(n) {
  57. num++
  58. }
  59. return true
  60. })
  61. return
  62. }
  63. func prettySince(t time.Time) string {
  64. if t.IsZero() {
  65. return "never"
  66. }
  67. d := time.Since(t)
  68. d /= time.Second
  69. d *= time.Second
  70. return fmt.Sprintf("%s ago", d)
  71. }
  72. func (s *Server) WriteStatus(w io.Writer) {
  73. fmt.Fprintf(w, "Listening on %s\n", s.Addr())
  74. s.mu.Lock()
  75. defer s.mu.Unlock()
  76. fmt.Fprintf(w, "Nodes in table: %d good, %d total\n", s.numGoodNodes(), s.numNodes())
  77. fmt.Fprintf(w, "Ongoing transactions: %d\n", len(s.transactions))
  78. fmt.Fprintf(w, "Server node ID: %x\n", s.id.Bytes())
  79. for i, b := range s.table.buckets {
  80. if b.Len() == 0 && b.lastChanged.IsZero() {
  81. continue
  82. }
  83. fmt.Fprintf(w,
  84. "b# %v: %v nodes, last updated: %v\n",
  85. i, b.Len(), prettySince(b.lastChanged))
  86. if b.Len() > 0 {
  87. tw := tabwriter.NewWriter(w, 0, 0, 1, ' ', 0)
  88. fmt.Fprintf(tw, " node id\taddr\tlast query\tlast response\trecv\tdiscard\tflags\n")
  89. b.EachNode(func(n *node) bool {
  90. var flags []string
  91. if s.IsQuestionable(n) {
  92. flags = append(flags, "q10e")
  93. }
  94. if s.nodeIsBad(n) {
  95. flags = append(flags, "bad")
  96. }
  97. if s.IsGood(n) {
  98. flags = append(flags, "good")
  99. }
  100. if n.IsSecure() {
  101. flags = append(flags, "sec")
  102. }
  103. fmt.Fprintf(tw, " %x\t%s\t%s\t%s\t%d\t%v\t%v\n",
  104. n.Id.Bytes(),
  105. n.Addr,
  106. prettySince(n.lastGotQuery),
  107. prettySince(n.lastGotResponse),
  108. n.numReceivesFrom,
  109. n.failedLastQuestionablePing,
  110. strings.Join(flags, ","),
  111. )
  112. return true
  113. })
  114. tw.Flush()
  115. }
  116. }
  117. fmt.Fprintln(w)
  118. }
  119. func (s *Server) numNodes() (num int) {
  120. s.table.forNodes(func(n *node) bool {
  121. num++
  122. return true
  123. })
  124. return
  125. }
  126. // Stats returns statistics for the server.
  127. func (s *Server) Stats() ServerStats {
  128. s.mu.Lock()
  129. defer s.mu.Unlock()
  130. ss := s.stats
  131. ss.GoodNodes = s.numGoodNodes()
  132. ss.Nodes = s.numNodes()
  133. ss.OutstandingTransactions = len(s.transactions)
  134. return ss
  135. }
  136. // Addr returns the listen address for the server. Packets arriving to this address
  137. // are processed by the server (unless aliens are involved).
  138. func (s *Server) Addr() net.Addr {
  139. return s.socket.LocalAddr()
  140. }
  141. func NewDefaultServerConfig() *ServerConfig {
  142. return &ServerConfig{
  143. NoSecurity: true,
  144. StartingNodes: func() ([]Addr, error) { return GlobalBootstrapAddrs("udp") },
  145. DefaultWant: []krpc.Want{krpc.WantNodes, krpc.WantNodes6},
  146. Store: bep44.NewMemory(),
  147. Exp: 2 * time.Hour,
  148. SendLimiter: DefaultSendLimiter,
  149. }
  150. }
  151. // If the NodeId hasn't been specified, generate a suitable one. deterministic if c.Conn and
  152. // c.PublicIP are non-nil.
  153. func (c *ServerConfig) InitNodeId() (deterministic bool) {
  154. if c.NodeId.IsZero() {
  155. var secure bool
  156. if c.Conn != nil && c.PublicIP != nil {
  157. // Is this sufficient for a deterministic node ID?
  158. c.NodeId = HashTuple(
  159. []byte(c.Conn.LocalAddr().Network()),
  160. []byte(c.Conn.LocalAddr().String()),
  161. c.PublicIP,
  162. )
  163. // Since we have a public IP we can secure, and the choice must not be influenced by the
  164. // NoSecure configuration option.
  165. secure = true
  166. deterministic = true
  167. } else {
  168. c.NodeId = RandomNodeID()
  169. secure = !c.NoSecurity && c.PublicIP != nil
  170. }
  171. if secure {
  172. SecureNodeId(&c.NodeId, c.PublicIP)
  173. }
  174. }
  175. return
  176. }
  177. // NewServer initializes a new DHT node server.
  178. func NewServer(c *ServerConfig) (s *Server, err error) {
  179. if c == nil {
  180. c = NewDefaultServerConfig()
  181. }
  182. if c.Conn == nil {
  183. c.Conn, err = net.ListenPacket("udp", ":0")
  184. if err != nil {
  185. return
  186. }
  187. }
  188. c.InitNodeId()
  189. // If Logger is empty, emulate the old behaviour: Everything is logged to the default location,
  190. // and there are no debug messages.
  191. if c.Logger.IsZero() {
  192. c.Logger = log.Default.FilterLevel(log.Info)
  193. }
  194. // Add log.Debug by default.
  195. c.Logger = c.Logger.WithDefaultLevel(log.Debug)
  196. if c.Store == nil {
  197. c.Store = bep44.NewMemory()
  198. }
  199. if c.SendLimiter == nil {
  200. c.SendLimiter = DefaultSendLimiter
  201. }
  202. s = &Server{
  203. config: *c,
  204. ipBlockList: c.IPBlocklist,
  205. tokenServer: tokenServer{
  206. maxIntervalDelta: 2,
  207. interval: 5 * time.Minute,
  208. secret: make([]byte, 20),
  209. },
  210. transactions: make(map[transactionKey]*Transaction),
  211. table: table{
  212. k: 8,
  213. },
  214. store: bep44.NewWrapper(c.Store, c.Exp),
  215. }
  216. rand.Read(s.tokenServer.secret)
  217. s.socket = c.Conn
  218. s.id = int160.FromByteArray(c.NodeId)
  219. s.table.rootID = s.id
  220. s.resendDelay = s.config.QueryResendDelay
  221. if s.resendDelay == nil {
  222. s.resendDelay = defaultQueryResendDelay
  223. }
  224. go s.serveUntilClosed()
  225. return
  226. }
  227. func (s *Server) serveUntilClosed() {
  228. err := s.serve()
  229. s.mu.Lock()
  230. defer s.mu.Unlock()
  231. if s.closed.IsSet() {
  232. return
  233. }
  234. if err != nil {
  235. panic(err)
  236. }
  237. }
  238. // Returns a description of the Server.
  239. func (s *Server) String() string {
  240. return fmt.Sprintf("dht server on %s (node id %v)", s.socket.LocalAddr(), s.id)
  241. }
  242. // Packets to and from any address matching a range in the list are dropped.
  243. func (s *Server) SetIPBlockList(list iplist.Ranger) {
  244. s.mu.Lock()
  245. defer s.mu.Unlock()
  246. s.ipBlockList = list
  247. }
  248. func (s *Server) IPBlocklist() iplist.Ranger {
  249. return s.ipBlockList
  250. }
  251. func (s *Server) processPacket(b []byte, addr Addr) {
  252. // log.Printf("got packet %q", b)
  253. if len(b) < 2 || b[0] != 'd' {
  254. // KRPC messages are bencoded dicts.
  255. readNotKRPCDict.Add(1)
  256. return
  257. }
  258. var d krpc.Msg
  259. err := bencode.Unmarshal(b, &d)
  260. if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
  261. // log.Printf("%s: received message packet with %d trailing bytes: %q", s, _err.NumUnusedBytes, b[len(b)-_err.NumUnusedBytes:])
  262. expvars.Add("processed packets with trailing bytes", 1)
  263. } else if err != nil {
  264. readUnmarshalError.Add(1)
  265. // log.Printf("%s: received bad krpc message from %s: %s: %+q", s, addr, err, b)
  266. func() {
  267. if se, ok := err.(*bencode.SyntaxError); ok {
  268. // The message was truncated.
  269. if int(se.Offset) == len(b) {
  270. return
  271. }
  272. // Some messages seem to drop to nul chars abrubtly.
  273. if int(se.Offset) < len(b) && b[se.Offset] == 0 {
  274. return
  275. }
  276. // The message isn't bencode from the first.
  277. if se.Offset == 0 {
  278. return
  279. }
  280. }
  281. // if missinggo.CryHeard() {
  282. log.Printf("%s: received bad krpc message from %s: %s: %+q", s, addr, err, b)
  283. // }
  284. }()
  285. return
  286. }
  287. s.mu.Lock()
  288. defer s.mu.Unlock()
  289. if s.closed.IsSet() {
  290. return
  291. }
  292. if d.Y == "q" {
  293. expvars.Add("received queries", 1)
  294. s.logger().Printf("received query %q from %v", d.Q, addr)
  295. s.handleQuery(addr, d)
  296. return
  297. }
  298. tk := transactionKey{
  299. RemoteAddr: addr.String(),
  300. T: d.T,
  301. }
  302. t, ok := s.transactions[tk]
  303. if !ok {
  304. s.logger().Printf("received response for untracked transaction %q from %v", d.T, addr)
  305. return
  306. }
  307. // s.logger().Printf("received response for transaction %q from %v", d.T, addr)
  308. go t.handleResponse(d)
  309. s.updateNode(addr, d.SenderID(), !d.ReadOnly, func(n *node) {
  310. n.lastGotResponse = time.Now()
  311. n.failedLastQuestionablePing = false
  312. n.numReceivesFrom++
  313. })
  314. // Ensure we don't provide more than one response to a transaction.
  315. s.deleteTransaction(tk)
  316. }
  317. func (s *Server) serve() error {
  318. var b [0x10000]byte
  319. for {
  320. n, addr, err := s.socket.ReadFrom(b[:])
  321. if err != nil {
  322. if ignoreReadFromError(err) {
  323. continue
  324. }
  325. return err
  326. }
  327. expvars.Add("packets read", 1)
  328. if n == len(b) {
  329. logonce.Stderr.Printf("received dht packet exceeds buffer size")
  330. continue
  331. }
  332. if missinggo.AddrPort(addr) == 0 {
  333. readZeroPort.Add(1)
  334. continue
  335. }
  336. blocked, err := func() (bool, error) {
  337. s.mu.RLock()
  338. defer s.mu.RUnlock()
  339. if s.closed.IsSet() {
  340. return false, errors.New("server is closed")
  341. }
  342. return s.ipBlocked(missinggo.AddrIP(addr)), nil
  343. }()
  344. if err != nil {
  345. return err
  346. }
  347. if blocked {
  348. readBlocked.Add(1)
  349. continue
  350. }
  351. s.processPacket(b[:n], NewAddr(addr))
  352. }
  353. }
  354. func (s *Server) ipBlocked(ip net.IP) (blocked bool) {
  355. if s.ipBlockList == nil {
  356. return
  357. }
  358. _, blocked = s.ipBlockList.Lookup(ip)
  359. return
  360. }
  361. // Adds directly to the node table.
  362. func (s *Server) AddNode(ni krpc.NodeInfo) error {
  363. id := int160.FromByteArray(ni.ID)
  364. if id.IsZero() {
  365. go s.Ping(ni.Addr.UDP())
  366. return nil
  367. }
  368. s.mu.Lock()
  369. defer s.mu.Unlock()
  370. return s.updateNode(NewAddr(ni.Addr.UDP()), (*krpc.ID)(&ni.ID), true, func(*node) {})
  371. }
  372. func wantsContain(ws []krpc.Want, w krpc.Want) bool {
  373. for _, _w := range ws {
  374. if _w == w {
  375. return true
  376. }
  377. }
  378. return false
  379. }
  380. func shouldReturnNodes(queryWants []krpc.Want, querySource net.IP) bool {
  381. if len(queryWants) != 0 {
  382. return wantsContain(queryWants, krpc.WantNodes)
  383. }
  384. // Is it possible to be over IPv6 with IPv4 endpoints?
  385. return querySource.To4() != nil
  386. }
  387. func shouldReturnNodes6(queryWants []krpc.Want, querySource net.IP) bool {
  388. if len(queryWants) != 0 {
  389. return wantsContain(queryWants, krpc.WantNodes6)
  390. }
  391. return querySource.To4() == nil
  392. }
  393. func (s *Server) makeReturnNodes(target int160.T, filter func(krpc.NodeAddr) bool) []krpc.NodeInfo {
  394. return s.closestGoodNodeInfos(8, target, filter)
  395. }
  396. var krpcErrMissingArguments = krpc.Error{
  397. Code: krpc.ErrorCodeProtocolError,
  398. Msg: "missing arguments dict",
  399. }
  400. // Filters peers per BEP 32 to return in the values field to a get_peers query.
  401. func filterPeers(querySourceIp net.IP, queryWants []krpc.Want, allPeers []krpc.NodeAddr) (filtered []krpc.NodeAddr) {
  402. // The logic here is common with nodes, see BEP 32.
  403. retain4 := shouldReturnNodes(queryWants, querySourceIp)
  404. retain6 := shouldReturnNodes6(queryWants, querySourceIp)
  405. for _, peer := range allPeers {
  406. if ip, ok := func(ip net.IP) (net.IP, bool) {
  407. as4 := peer.IP.To4()
  408. as16 := peer.IP.To16()
  409. switch {
  410. case retain4 && len(ip) == net.IPv4len:
  411. return ip, true
  412. case retain6 && len(ip) == net.IPv6len:
  413. return ip, true
  414. case retain4 && as4 != nil:
  415. // Is it possible that we're converting to an IPv4 address when the transport in use
  416. // is IPv6?
  417. return as4, true
  418. case retain6 && as16 != nil:
  419. // Couldn't any IPv4 address be converted to IPv6, but isn't listening over IPv6?
  420. return as16, true
  421. default:
  422. return nil, false
  423. }
  424. }(peer.IP); ok {
  425. filtered = append(filtered, krpc.NodeAddr{IP: ip, Port: peer.Port})
  426. }
  427. }
  428. return
  429. }
  430. func (s *Server) setReturnNodes(r *krpc.Return, queryMsg krpc.Msg, querySource Addr) *krpc.Error {
  431. if queryMsg.A == nil {
  432. return &krpcErrMissingArguments
  433. }
  434. target := int160.FromByteArray(queryMsg.A.InfoHash)
  435. if shouldReturnNodes(queryMsg.A.Want, querySource.IP()) {
  436. r.Nodes = s.makeReturnNodes(target, func(na krpc.NodeAddr) bool { return na.IP.To4() != nil })
  437. }
  438. if shouldReturnNodes6(queryMsg.A.Want, querySource.IP()) {
  439. r.Nodes6 = s.makeReturnNodes(target, func(krpc.NodeAddr) bool { return true })
  440. }
  441. return nil
  442. }
  443. func (s *Server) handleQuery(source Addr, m krpc.Msg) {
  444. go func() {
  445. expvars.Add(fmt.Sprintf("received query %q", m.Q), 1)
  446. if a := m.A; a != nil {
  447. if a.NoSeed != 0 {
  448. expvars.Add("received argument noseed", 1)
  449. }
  450. if a.Scrape != 0 {
  451. expvars.Add("received argument scrape", 1)
  452. }
  453. }
  454. }()
  455. s.updateNode(source, m.SenderID(), !m.ReadOnly, func(n *node) {
  456. n.lastGotQuery = time.Now()
  457. n.numReceivesFrom++
  458. })
  459. if s.config.OnQuery != nil {
  460. propagate := s.config.OnQuery(&m, source.Raw())
  461. if !propagate {
  462. return
  463. }
  464. }
  465. // Don't respond.
  466. if s.config.Passive {
  467. return
  468. }
  469. // TODO: Should we disallow replying to ourself?
  470. args := m.A
  471. switch m.Q {
  472. case "ping":
  473. s.reply(source, m.T, krpc.Return{})
  474. case "get_peers":
  475. // Check for the naked m.A.Want deref below.
  476. if m.A == nil {
  477. s.sendError(source, m.T, krpcErrMissingArguments)
  478. break
  479. }
  480. var r krpc.Return
  481. if ps := s.config.PeerStore; ps != nil {
  482. r.Values = filterPeers(source.IP(), m.A.Want, ps.GetPeers(peer_store.InfoHash(args.InfoHash)))
  483. r.Token = func() *string {
  484. t := s.createToken(source)
  485. return &t
  486. }()
  487. }
  488. if len(r.Values) == 0 {
  489. if err := s.setReturnNodes(&r, m, source); err != nil {
  490. s.sendError(source, m.T, *err)
  491. break
  492. }
  493. }
  494. s.reply(source, m.T, r)
  495. case "find_node":
  496. var r krpc.Return
  497. if err := s.setReturnNodes(&r, m, source); err != nil {
  498. s.sendError(source, m.T, *err)
  499. break
  500. }
  501. s.reply(source, m.T, r)
  502. case "announce_peer":
  503. if !s.validToken(args.Token, source) {
  504. expvars.Add("received announce_peer with invalid token", 1)
  505. return
  506. }
  507. expvars.Add("received announce_peer with valid token", 1)
  508. var port int
  509. portOk := false
  510. if args.Port != nil {
  511. port = *args.Port
  512. portOk = true
  513. }
  514. if args.ImpliedPort {
  515. expvars.Add("received announce_peer with implied_port", 1)
  516. port = source.Port()
  517. portOk = true
  518. }
  519. if !portOk {
  520. expvars.Add("received announce_peer with no derivable port", 1)
  521. }
  522. if h := s.config.OnAnnouncePeer; h != nil {
  523. go h(metainfo.Hash(args.InfoHash), source.IP(), port, portOk)
  524. }
  525. if ps := s.config.PeerStore; ps != nil {
  526. go ps.AddPeer(
  527. peer_store.InfoHash(args.InfoHash),
  528. krpc.NodeAddr{IP: source.IP(), Port: port},
  529. )
  530. }
  531. s.reply(source, m.T, krpc.Return{})
  532. case "put":
  533. if !s.validToken(args.Token, source) {
  534. expvars.Add("received put with invalid token", 1)
  535. return
  536. }
  537. expvars.Add("received put with valid token", 1)
  538. i := &bep44.Item{
  539. V: args.V,
  540. K: args.K,
  541. Salt: args.Salt,
  542. Sig: args.Sig,
  543. Cas: args.Cas,
  544. Seq: *args.Seq,
  545. }
  546. if err := s.store.Put(i); err != nil {
  547. kerr, ok := err.(krpc.Error)
  548. if !ok {
  549. s.sendError(source, m.T, krpc.ErrorMethodUnknown)
  550. }
  551. s.sendError(source, m.T, kerr)
  552. break
  553. }
  554. s.reply(source, m.T, krpc.Return{
  555. ID: s.ID(),
  556. })
  557. case "get":
  558. var r krpc.Return
  559. if err := s.setReturnNodes(&r, m, source); err != nil {
  560. s.sendError(source, m.T, *err)
  561. break
  562. }
  563. t := s.createToken(source)
  564. r.Token = &t
  565. item, err := s.store.Get(bep44.Target(args.Target))
  566. if err == bep44.ErrItemNotFound {
  567. s.reply(source, m.T, r)
  568. break
  569. }
  570. if kerr, ok := err.(krpc.Error); ok {
  571. s.sendError(source, m.T, kerr)
  572. break
  573. }
  574. if err != nil {
  575. s.sendError(source, m.T, krpc.Error{
  576. Code: krpc.ErrorCodeGenericError,
  577. Msg: err.Error(),
  578. })
  579. break
  580. }
  581. r.Seq = &item.Seq
  582. if args.Seq != nil && item.Seq <= *args.Seq {
  583. s.reply(source, m.T, r)
  584. break
  585. }
  586. r.V = bencode.MustMarshal(item.V)
  587. r.K = item.K
  588. r.Sig = item.Sig
  589. s.reply(source, m.T, r)
  590. // case "sample_infohashes":
  591. // // Nodes supporting this extension should always include the samples field in the response,
  592. // // even when it is zero-length. This lets indexing nodes to distinguish nodes supporting this
  593. // // extension from those that respond to unknown query types which contain a target field [2].
  594. default:
  595. // TODO: http://libtorrent.org/dht_extensions.html#forward-compatibility
  596. s.sendError(source, m.T, krpc.ErrorMethodUnknown)
  597. }
  598. }
  599. func (s *Server) sendError(addr Addr, t string, e krpc.Error) {
  600. go func() {
  601. m := krpc.Msg{
  602. T: t,
  603. Y: "e",
  604. E: &e,
  605. }
  606. b, err := bencode.Marshal(m)
  607. if err != nil {
  608. panic(err)
  609. }
  610. s.logger().Printf("sending error to %q: %v", addr, e)
  611. _, err = s.writeToNode(context.Background(), b, addr, false, true)
  612. if err != nil {
  613. s.logger().Printf("error replying to %q: %v", addr, err)
  614. }
  615. }()
  616. }
  617. func (s *Server) reply(addr Addr, t string, r krpc.Return) {
  618. go func() {
  619. r.ID = s.id.AsByteArray()
  620. m := krpc.Msg{
  621. T: t,
  622. Y: "r",
  623. R: &r,
  624. IP: addr.KRPC(),
  625. }
  626. b := bencode.MustMarshal(m)
  627. log.Fmsg("replying to %q", addr).Log(s.logger())
  628. wrote, err := s.writeToNode(context.Background(), b, addr, s.config.WaitToReply, true)
  629. if err != nil {
  630. s.config.Logger.Printf("error replying to %s: %s", addr, err)
  631. }
  632. if wrote {
  633. expvars.Add("replied to peer", 1)
  634. }
  635. }()
  636. }
  637. // Adds a node if appropriate.
  638. func (s *Server) addNode(n *node) error {
  639. if s.nodeIsBad(n) {
  640. return errors.New("node is bad")
  641. }
  642. b := s.table.bucketForID(n.Id)
  643. if b.Len() >= s.table.k {
  644. if b.EachNode(func(bn *node) bool {
  645. // Replace bad and untested nodes with a good one.
  646. if s.nodeIsBad(bn) || (s.IsGood(n) && bn.lastGotResponse.IsZero()) {
  647. s.table.dropNode(bn)
  648. }
  649. return b.Len() >= s.table.k
  650. }) {
  651. return errors.New("no room in bucket")
  652. }
  653. }
  654. if err := s.table.addNode(n); err != nil {
  655. panic(fmt.Sprintf("expected to add node: %s", err))
  656. }
  657. return nil
  658. }
  659. func (s *Server) NodeRespondedToPing(addr Addr, id int160.T) {
  660. s.mu.Lock()
  661. defer s.mu.Unlock()
  662. if id == s.id {
  663. return
  664. }
  665. b := s.table.bucketForID(id)
  666. if b.GetNode(addr, id) == nil {
  667. return
  668. }
  669. b.lastChanged = time.Now()
  670. }
  671. // Updates the node, adding it if appropriate.
  672. func (s *Server) updateNode(addr Addr, id *krpc.ID, tryAdd bool, update func(*node)) error {
  673. if id == nil {
  674. return errors.New("id is nil")
  675. }
  676. int160Id := int160.FromByteArray(*id)
  677. n := s.table.getNode(addr, int160Id)
  678. missing := n == nil
  679. if missing {
  680. if !tryAdd {
  681. return errors.New("node not present and add flag false")
  682. }
  683. if int160Id == s.id {
  684. return errors.New("can't store own id in routing table")
  685. }
  686. n = &node{nodeKey: nodeKey{
  687. Id: int160Id,
  688. Addr: addr,
  689. }}
  690. }
  691. update(n)
  692. if !missing {
  693. return nil
  694. }
  695. return s.addNode(n)
  696. }
  697. func (s *Server) nodeIsBad(n *node) bool {
  698. return s.nodeErr(n) != nil
  699. }
  700. func (s *Server) nodeErr(n *node) error {
  701. if n.Id == s.id {
  702. return errors.New("is self")
  703. }
  704. if n.Id.IsZero() {
  705. return errors.New("has zero id")
  706. }
  707. if !(s.config.NoSecurity || n.IsSecure()) {
  708. return errors.New("not secure")
  709. }
  710. if n.failedLastQuestionablePing {
  711. return errors.New("didn't respond to last questionable node ping")
  712. }
  713. return nil
  714. }
  715. func (s *Server) writeToNode(ctx context.Context, b []byte, node Addr, wait, rate bool) (wrote bool, err error) {
  716. func() {
  717. // This is a pain. It would be better if the blocklist returned an error if it was closed
  718. // instead.
  719. s.mu.RLock()
  720. defer s.mu.RUnlock()
  721. if s.closed.IsSet() {
  722. err = errors.New("server is closed")
  723. return
  724. }
  725. if list := s.ipBlockList; list != nil {
  726. if r, ok := list.Lookup(node.IP()); ok {
  727. err = fmt.Errorf("write to %v blocked by %v", node, r)
  728. return
  729. }
  730. }
  731. }()
  732. if err != nil {
  733. return
  734. }
  735. // s.config.Logger.WithValues(log.Debug).Printf("writing to %s: %q", node.String(), b)
  736. if rate {
  737. if wait {
  738. err = s.config.SendLimiter.Wait(ctx)
  739. if err != nil {
  740. err = fmt.Errorf("waiting for rate-limit token: %w", err)
  741. return false, err
  742. }
  743. } else {
  744. if !s.config.SendLimiter.Allow() {
  745. return false, errors.New("rate limit exceeded")
  746. }
  747. }
  748. }
  749. n, err := s.socket.WriteTo(b, node.Raw())
  750. writes.Add(1)
  751. if rate {
  752. expvars.Add("rated writes", 1)
  753. } else {
  754. expvars.Add("unrated writes", 1)
  755. }
  756. if err != nil {
  757. writeErrors.Add(1)
  758. if rate {
  759. // Give the token back. nfi if this will actually work.
  760. s.config.SendLimiter.AllowN(time.Now(), -1)
  761. }
  762. err = fmt.Errorf("error writing %d bytes to %s: %s", len(b), node, err)
  763. return
  764. }
  765. wrote = true
  766. if n != len(b) {
  767. err = io.ErrShortWrite
  768. return
  769. }
  770. return
  771. }
  772. func (s *Server) nextTransactionID() string {
  773. var b [binary.MaxVarintLen64]byte
  774. n := binary.PutUvarint(b[:], s.nextT)
  775. s.nextT++
  776. return string(b[:n])
  777. }
  778. func (s *Server) deleteTransaction(k transactionKey) {
  779. delete(s.transactions, k)
  780. }
  781. func (s *Server) addTransaction(k transactionKey, t *Transaction) {
  782. if _, ok := s.transactions[k]; ok {
  783. panic("transaction not unique")
  784. }
  785. s.transactions[k] = t
  786. }
  787. // ID returns the 20-byte server ID. This is the ID used to communicate with the
  788. // DHT network.
  789. func (s *Server) ID() [20]byte {
  790. return s.id.AsByteArray()
  791. }
  792. func (s *Server) createToken(addr Addr) string {
  793. return s.tokenServer.CreateToken(addr)
  794. }
  795. func (s *Server) validToken(token string, addr Addr) bool {
  796. return s.tokenServer.ValidToken(token, addr)
  797. }
  798. type numWrites int
  799. func (s *Server) makeQueryBytes(q string, a krpc.MsgArgs, t string) []byte {
  800. a.ID = s.ID()
  801. m := krpc.Msg{
  802. T: t,
  803. Y: "q",
  804. Q: q,
  805. A: &a,
  806. }
  807. // BEP 43. Outgoing queries from passive nodes should contain "ro":1 in the top level
  808. // dictionary.
  809. if s.config.Passive {
  810. m.ReadOnly = true
  811. }
  812. b, err := bencode.Marshal(m)
  813. if err != nil {
  814. panic(err)
  815. }
  816. return b
  817. }
  818. type QueryResult struct {
  819. Reply krpc.Msg
  820. Writes numWrites
  821. Err error
  822. }
  823. func (qr QueryResult) ToError() error {
  824. if qr.Err != nil {
  825. return qr.Err
  826. }
  827. e := qr.Reply.Error()
  828. if e != nil {
  829. return e
  830. }
  831. return nil
  832. }
  833. // Converts a Server QueryResult to a traversal.QueryResult.
  834. func (me QueryResult) TraversalQueryResult(addr krpc.NodeAddr) (ret traversal.QueryResult) {
  835. r := me.Reply.R
  836. if r == nil {
  837. return
  838. }
  839. ret.ResponseFrom = &krpc.NodeInfo{
  840. Addr: addr,
  841. ID: r.ID,
  842. }
  843. ret.Nodes = r.Nodes
  844. ret.Nodes6 = r.Nodes6
  845. if r.Token != nil {
  846. ret.ClosestData = *r.Token
  847. }
  848. return
  849. }
  850. // Rate-limiting to be applied to writes for a given query. Queries occur inside transactions that
  851. // will attempt to send several times. If the STM rate-limiting helpers are used, the first send is
  852. // often already accounted for in the rate-limiting machinery before the query method that does the
  853. // IO is invoked.
  854. type QueryRateLimiting struct {
  855. // Don't rate-limit the first send for a query.
  856. NotFirst bool
  857. // Don't rate-limit any sends for a query. Note that there's still built-in waits before retries.
  858. NotAny bool
  859. WaitOnRetries bool
  860. NoWaitFirst bool
  861. }
  862. // The zero value for this uses reasonable/traditional defaults on Server methods.
  863. type QueryInput struct {
  864. MsgArgs krpc.MsgArgs
  865. RateLimiting QueryRateLimiting
  866. NumTries int
  867. }
  868. // Performs an arbitrary query. `q` is the query value, defined by the DHT BEP. `a` should contain
  869. // the appropriate argument values, if any. `a.ID` is clobbered by the Server. Responses to queries
  870. // made this way are not interpreted by the Server. More specific methods like FindNode and GetPeers
  871. // may make use of the response internally before passing it back to the caller.
  872. func (s *Server) Query(ctx context.Context, addr Addr, q string, input QueryInput) (ret QueryResult) {
  873. if input.NumTries == 0 {
  874. input.NumTries = defaultMaxQuerySends
  875. }
  876. defer func(started time.Time) {
  877. s.logger().WithDefaultLevel(log.Debug).WithValues(q).Printf(
  878. "Query(%v) returned after %v (err=%v, reply.Y=%v, reply.E=%v, writes=%v)",
  879. q, time.Since(started), ret.Err, ret.Reply.Y, ret.Reply.E, ret.Writes)
  880. }(time.Now())
  881. replyChan := make(chan krpc.Msg, 1)
  882. t := &Transaction{
  883. onResponse: func(m krpc.Msg) {
  884. replyChan <- m
  885. },
  886. }
  887. tk := transactionKey{
  888. RemoteAddr: addr.String(),
  889. }
  890. s.mu.Lock()
  891. tid := s.nextTransactionID()
  892. s.stats.OutboundQueriesAttempted++
  893. tk.T = tid
  894. s.addTransaction(tk, t)
  895. s.mu.Unlock()
  896. // Receives a non-nil error from the sender, and closes when the sender completes.
  897. sendErr := make(chan error, 1)
  898. sendCtx, cancelSend := context.WithCancel(pprof.WithLabels(ctx, pprof.Labels("q", q)))
  899. go func() {
  900. err := s.transactionQuerySender(
  901. sendCtx,
  902. s.makeQueryBytes(q, input.MsgArgs, tid),
  903. &ret.Writes,
  904. addr,
  905. input.RateLimiting,
  906. input.NumTries)
  907. if err != nil {
  908. sendErr <- err
  909. }
  910. close(sendErr)
  911. }()
  912. expvars.Add(fmt.Sprintf("outbound %s queries", q), 1)
  913. select {
  914. case ret.Reply = <-replyChan:
  915. case <-ctx.Done():
  916. ret.Err = ctx.Err()
  917. case ret.Err = <-sendErr:
  918. }
  919. // Make sure the query sender stops.
  920. cancelSend()
  921. // Make sure the query sender has returned, it will either send an error that we didn't catch
  922. // above, or the channel will be closed by the sender completing.
  923. <-sendErr
  924. s.mu.Lock()
  925. s.deleteTransaction(tk)
  926. s.mu.Unlock()
  927. return
  928. }
  929. func (s *Server) transactionQuerySender(
  930. sendCtx context.Context,
  931. b []byte,
  932. writes *numWrites,
  933. addr Addr,
  934. rateLimiting QueryRateLimiting,
  935. numTries int,
  936. ) error {
  937. // log.Printf("sending %q", b)
  938. err := transactionSender(
  939. sendCtx,
  940. func() error {
  941. wrote, err := s.writeToNode(sendCtx, b, addr,
  942. // We only wait for the first write by default if rate-limiting is enabled for this
  943. // query.
  944. func() bool {
  945. if *writes == 0 {
  946. return !rateLimiting.NoWaitFirst
  947. } else {
  948. return rateLimiting.WaitOnRetries
  949. }
  950. }(),
  951. func() bool {
  952. if rateLimiting.NotAny {
  953. return false
  954. }
  955. if *writes == 0 {
  956. return !rateLimiting.NotFirst
  957. }
  958. return true
  959. }(),
  960. )
  961. if wrote {
  962. *writes++
  963. }
  964. return err
  965. },
  966. s.resendDelay,
  967. numTries,
  968. )
  969. if err != nil {
  970. return err
  971. }
  972. select {
  973. case <-sendCtx.Done():
  974. err = sendCtx.Err()
  975. case <-time.After(s.resendDelay()):
  976. err = TransactionTimeout
  977. }
  978. return fmt.Errorf("after %v tries: %w", numTries, err)
  979. }
  980. // Sends a ping query to the address given.
  981. func (s *Server) PingQueryInput(node *net.UDPAddr, qi QueryInput) QueryResult {
  982. addr := NewAddr(node)
  983. res := s.Query(context.TODO(), addr, "ping", qi)
  984. if res.Err == nil {
  985. id := res.Reply.SenderID()
  986. if id != nil {
  987. s.NodeRespondedToPing(addr, id.Int160())
  988. }
  989. }
  990. return res
  991. }
  992. // Sends a ping query to the address given.
  993. func (s *Server) Ping(node *net.UDPAddr) QueryResult {
  994. return s.PingQueryInput(node, QueryInput{})
  995. }
  996. // Put adds a new item to node. You need to call Get first for a write token.
  997. func (s *Server) Put(ctx context.Context, node Addr, i bep44.Put, token string, rl QueryRateLimiting) QueryResult {
  998. if err := s.store.Put(i.ToItem()); err != nil {
  999. return QueryResult{
  1000. Err: err,
  1001. }
  1002. }
  1003. qi := QueryInput{
  1004. MsgArgs: krpc.MsgArgs{
  1005. Cas: i.Cas,
  1006. ID: s.ID(),
  1007. Salt: i.Salt,
  1008. Seq: &i.Seq,
  1009. Sig: i.Sig,
  1010. Token: token,
  1011. V: i.V,
  1012. },
  1013. }
  1014. if i.K != nil {
  1015. qi.MsgArgs.K = *i.K
  1016. }
  1017. return s.Query(ctx, node, "put", qi)
  1018. }
  1019. func (s *Server) announcePeer(
  1020. ctx context.Context,
  1021. node Addr, infoHash int160.T, port int, token string, impliedPort bool, rl QueryRateLimiting,
  1022. ) (
  1023. ret QueryResult,
  1024. ) {
  1025. if port == 0 && !impliedPort {
  1026. ret.Err = errors.New("no port specified")
  1027. return
  1028. }
  1029. ret = s.Query(
  1030. ctx, node, "announce_peer",
  1031. QueryInput{
  1032. MsgArgs: krpc.MsgArgs{
  1033. ImpliedPort: impliedPort,
  1034. InfoHash: infoHash.AsByteArray(),
  1035. Port: &port,
  1036. Token: token,
  1037. },
  1038. RateLimiting: rl,
  1039. })
  1040. if ret.Err != nil {
  1041. return
  1042. }
  1043. if krpcError := ret.Reply.Error(); krpcError != nil {
  1044. announceErrors.Add(1)
  1045. ret.Err = krpcError
  1046. return
  1047. }
  1048. s.mu.Lock()
  1049. defer s.mu.Unlock()
  1050. s.stats.SuccessfulOutboundAnnouncePeerQueries++
  1051. return
  1052. }
  1053. // Sends a find_node query to addr. targetID is the node we're looking for. The Server makes use of
  1054. // some of the response fields.
  1055. func (s *Server) FindNode(addr Addr, targetID int160.T, rl QueryRateLimiting) (ret QueryResult) {
  1056. ret = s.Query(context.TODO(), addr, "find_node", QueryInput{
  1057. MsgArgs: krpc.MsgArgs{
  1058. Target: targetID.AsByteArray(),
  1059. Want: s.config.DefaultWant,
  1060. },
  1061. RateLimiting: rl,
  1062. })
  1063. return
  1064. }
  1065. // Returns how many nodes are in the node table.
  1066. func (s *Server) NumNodes() int {
  1067. s.mu.Lock()
  1068. defer s.mu.Unlock()
  1069. return s.numNodes()
  1070. }
  1071. // Returns non-bad nodes from the routing table.
  1072. func (s *Server) Nodes() (nis []krpc.NodeInfo) {
  1073. s.mu.Lock()
  1074. defer s.mu.Unlock()
  1075. return s.notBadNodes()
  1076. }
  1077. // Returns non-bad nodes from the routing table.
  1078. func (s *Server) notBadNodes() (nis []krpc.NodeInfo) {
  1079. s.table.forNodes(func(n *node) bool {
  1080. if s.nodeIsBad(n) {
  1081. return true
  1082. }
  1083. nis = append(nis, krpc.NodeInfo{
  1084. Addr: n.Addr.KRPC(),
  1085. ID: n.Id.AsByteArray(),
  1086. })
  1087. return true
  1088. })
  1089. return
  1090. }
  1091. // Stops the server network activity. This is all that's required to clean-up a Server.
  1092. func (s *Server) Close() {
  1093. s.mu.Lock()
  1094. defer s.mu.Unlock()
  1095. s.closed.Set()
  1096. go s.socket.Close()
  1097. }
  1098. func (s *Server) GetPeers(ctx context.Context, addr Addr, infoHash int160.T, scrape bool, rl QueryRateLimiting) (ret QueryResult) {
  1099. args := krpc.MsgArgs{
  1100. InfoHash: infoHash.AsByteArray(),
  1101. // TODO: Maybe IPv4-only Servers won't want IPv6 nodes?
  1102. Want: s.config.DefaultWant,
  1103. }
  1104. if scrape {
  1105. args.Scrape = 1
  1106. }
  1107. ret = s.Query(ctx, addr, "get_peers", QueryInput{
  1108. MsgArgs: args,
  1109. RateLimiting: rl,
  1110. })
  1111. m := ret.Reply
  1112. if m.R != nil {
  1113. if m.R.Token == nil {
  1114. expvars.Add("get_peers responses with no token", 1)
  1115. } else if len(*m.R.Token) == 0 {
  1116. expvars.Add("get_peers responses with empty token", 1)
  1117. } else {
  1118. expvars.Add("get_peers responses with token", 1)
  1119. }
  1120. }
  1121. return
  1122. }
  1123. // Get gets item information from a specific target ID. If seq is set to a specific value,
  1124. // only items with seq bigger than the one provided will return a V, K and Sig, if any.
  1125. // Get must be used to get a Put write token, when you want to write an item instead of read it.
  1126. func (s *Server) Get(ctx context.Context, addr Addr, target bep44.Target, seq *int64, rl QueryRateLimiting) QueryResult {
  1127. return s.Query(ctx, addr, "get", QueryInput{
  1128. MsgArgs: krpc.MsgArgs{
  1129. ID: s.ID(),
  1130. Target: target,
  1131. Seq: seq,
  1132. Want: []krpc.Want{krpc.WantNodes, krpc.WantNodes6},
  1133. },
  1134. RateLimiting: rl,
  1135. })
  1136. }
  1137. func (s *Server) closestGoodNodeInfos(
  1138. k int,
  1139. targetID int160.T,
  1140. filter func(krpc.NodeAddr) bool,
  1141. ) (
  1142. ret []krpc.NodeInfo,
  1143. ) {
  1144. for _, n := range s.closestNodes(k, targetID, func(n *node) bool {
  1145. return s.IsGood(n) && filter(n.NodeInfo().Addr)
  1146. }) {
  1147. ret = append(ret, n.NodeInfo())
  1148. }
  1149. return
  1150. }
  1151. func (s *Server) closestNodes(k int, target int160.T, filter func(*node) bool) []*node {
  1152. return s.table.closestNodes(k, target, filter)
  1153. }
  1154. func (s *Server) TraversalStartingNodes() (nodes []addrMaybeId, err error) {
  1155. s.mu.RLock()
  1156. s.table.forNodes(func(n *node) bool {
  1157. nodes = append(nodes, addrMaybeId{Addr: n.Addr.KRPC(), Id: &n.Id})
  1158. return true
  1159. })
  1160. s.mu.RUnlock()
  1161. if len(nodes) > 0 {
  1162. return
  1163. }
  1164. if s.config.StartingNodes != nil {
  1165. // There seems to be floods on this call on occasion, which may cause a barrage of DNS
  1166. // resolution attempts. This would require that we're unable to get replies because we can't
  1167. // resolve, transmit or receive on the network. Nodes currently don't get expired from the
  1168. // table, so once we have some entries, we should never have to fallback.
  1169. s.logger().Levelf(log.Debug, "falling back on starting nodes")
  1170. addrs, err := s.config.StartingNodes()
  1171. if err != nil {
  1172. return nil, fmt.Errorf("getting starting nodes: %w", err)
  1173. } else {
  1174. // log.Printf("resolved %v addresses", len(addrs))
  1175. }
  1176. for _, a := range addrs {
  1177. nodes = append(nodes, addrMaybeId{Addr: a.KRPC(), Id: nil})
  1178. }
  1179. }
  1180. if len(nodes) == 0 {
  1181. err = errors.New("no initial nodes")
  1182. }
  1183. return
  1184. }
  1185. func (s *Server) AddNodesFromFile(fileName string) (added int, err error) {
  1186. ns, err := ReadNodesFromFile(fileName)
  1187. if err != nil {
  1188. return
  1189. }
  1190. for _, n := range ns {
  1191. if s.AddNode(n) == nil {
  1192. added++
  1193. }
  1194. }
  1195. return
  1196. }
  1197. func (s *Server) logger() log.Logger {
  1198. return s.config.Logger
  1199. }
  1200. func (s *Server) PeerStore() peer_store.Interface {
  1201. return s.config.PeerStore
  1202. }
  1203. func (s *Server) getQuestionableNode() (ret *node) {
  1204. s.table.forNodes(func(n *node) bool {
  1205. if s.IsQuestionable(n) {
  1206. ret = n
  1207. return false
  1208. }
  1209. return true
  1210. })
  1211. return
  1212. }
  1213. func (s *Server) shouldStopRefreshingBucket(bucketIndex int) bool {
  1214. b := &s.table.buckets[bucketIndex]
  1215. // Stop if the bucket is full, and none of the nodes are bad.
  1216. return b.Len() == s.table.K() && b.EachNode(func(n *node) bool {
  1217. return !s.nodeIsBad(n)
  1218. })
  1219. }
  1220. func (s *Server) refreshBucket(bucketIndex int) *traversal.Stats {
  1221. s.mu.RLock()
  1222. id := s.table.randomIdForBucket(bucketIndex)
  1223. op := traversal.Start(traversal.OperationInput{
  1224. Target: id.AsByteArray(),
  1225. Alpha: 3,
  1226. // Running this to completion with K matching the full-bucket size should result in a good,
  1227. // full bucket, since the Server will add nodes that respond to its table to replace the bad
  1228. // ones we're presumably refreshing. It might be possible to terminate the traversal early
  1229. // as soon as the bucket is good.
  1230. K: s.table.K(),
  1231. DoQuery: func(ctx context.Context, addr krpc.NodeAddr) traversal.QueryResult {
  1232. res := s.FindNode(NewAddr(addr.UDP()), id, QueryRateLimiting{})
  1233. return res.TraversalQueryResult(addr)
  1234. },
  1235. NodeFilter: s.TraversalNodeFilter,
  1236. })
  1237. defer func() {
  1238. s.mu.RUnlock()
  1239. op.Stop()
  1240. <-op.Stopped()
  1241. }()
  1242. b := &s.table.buckets[bucketIndex]
  1243. wait:
  1244. for {
  1245. if s.shouldStopRefreshingBucket(bucketIndex) {
  1246. break wait
  1247. }
  1248. op.AddNodes(types.AddrMaybeIdSliceFromNodeInfoSlice(s.notBadNodes()))
  1249. bucketChanged := b.changed.Signaled()
  1250. s.mu.RUnlock()
  1251. select {
  1252. case <-op.Stalled():
  1253. s.mu.RLock()
  1254. break wait
  1255. case <-bucketChanged:
  1256. }
  1257. s.mu.RLock()
  1258. }
  1259. return op.Stats()
  1260. }
  1261. func (s *Server) shouldBootstrap() bool {
  1262. return s.lastBootstrap.IsZero() || time.Since(s.lastBootstrap) > 30*time.Minute
  1263. }
  1264. func (s *Server) shouldBootstrapUnlocked() bool {
  1265. s.mu.RLock()
  1266. defer s.mu.RUnlock()
  1267. return s.shouldBootstrap()
  1268. }
  1269. func (s *Server) pingQuestionableNodesInBucket(bucketIndex int) {
  1270. b := &s.table.buckets[bucketIndex]
  1271. var wg sync.WaitGroup
  1272. b.EachNode(func(n *node) bool {
  1273. if s.IsQuestionable(n) {
  1274. wg.Add(1)
  1275. go func() {
  1276. defer wg.Done()
  1277. err := s.questionableNodePing(context.TODO(), n.Addr, n.Id.AsByteArray()).Err
  1278. if err != nil {
  1279. s.logger().WithDefaultLevel(log.Debug).Printf("error pinging questionable node in bucket %v: %v", bucketIndex, err)
  1280. }
  1281. }()
  1282. }
  1283. return true
  1284. })
  1285. s.mu.RUnlock()
  1286. wg.Wait()
  1287. s.mu.RLock()
  1288. }
  1289. // A routine that maintains the Server's routing table, by pinging questionable nodes, and
  1290. // refreshing buckets. This should be invoked on a running Server when the caller is satisfied with
  1291. // having set it up. It is not necessary to explicitly Bootstrap the Server once this routine has
  1292. // started.
  1293. func (s *Server) TableMaintainer() {
  1294. logger := s.logger()
  1295. for {
  1296. if s.shouldBootstrapUnlocked() {
  1297. stats, err := s.Bootstrap()
  1298. if err != nil {
  1299. logger.Levelf(log.Error,"error bootstrapping during bucket refresh: %v", err)
  1300. }
  1301. logger.Levelf(log.Debug,"bucket refresh bootstrap stats: %v", stats)
  1302. }
  1303. s.mu.RLock()
  1304. for i := range s.table.buckets {
  1305. s.pingQuestionableNodesInBucket(i)
  1306. // if time.Since(b.lastChanged) < 15*time.Minute {
  1307. // continue
  1308. // }
  1309. if s.shouldStopRefreshingBucket(i) {
  1310. continue
  1311. }
  1312. logger.Levelf(log.Debug, "refreshing bucket %v", i)
  1313. s.mu.RUnlock()
  1314. stats := s.refreshBucket(i)
  1315. logger.Levelf(log.Debug, "finished refreshing bucket %v: %v", i, stats)
  1316. s.mu.RLock()
  1317. if !s.shouldStopRefreshingBucket(i) {
  1318. // Presumably we couldn't fill the bucket anymore, so assume we're as deep in the
  1319. // available node space as we can go.
  1320. break
  1321. }
  1322. }
  1323. s.mu.RUnlock()
  1324. select {
  1325. case <-s.closed.LockedChan(&s.mu):
  1326. return
  1327. case <-time.After(time.Minute):
  1328. }
  1329. }
  1330. }
  1331. func (s *Server) questionableNodePing(ctx context.Context, addr Addr, id krpc.ID) QueryResult {
  1332. // A ping query that will be certain to try at least 3 times.
  1333. res := s.Query(ctx, addr, "ping", QueryInput{
  1334. RateLimiting: QueryRateLimiting{
  1335. WaitOnRetries: true,
  1336. },
  1337. NumTries: 3,
  1338. })
  1339. if res.Err == nil && res.Reply.R != nil {
  1340. s.NodeRespondedToPing(addr, res.Reply.R.ID.Int160())
  1341. } else {
  1342. s.mu.Lock()
  1343. s.updateNode(addr, &id, false, func(n *node) {
  1344. n.failedLastQuestionablePing = true
  1345. })
  1346. s.mu.Unlock()
  1347. }
  1348. return res
  1349. }
  1350. // Whether we should consider a node for contact based on its address and possible ID.
  1351. func (s *Server) TraversalNodeFilter(node addrMaybeId) bool {
  1352. if !validNodeAddr(node.Addr.UDP()) {
  1353. return false
  1354. }
  1355. if s.ipBlocked(node.Addr.IP) {
  1356. return false
  1357. }
  1358. if node.Id == nil {
  1359. return true
  1360. }
  1361. return s.config.NoSecurity || NodeIdSecure(node.Id.AsByteArray(), node.Addr.IP)
  1362. }
  1363. func validNodeAddr(addr net.Addr) bool {
  1364. // At least for UDP addresses, we know what doesn't work.
  1365. ua := addr.(*net.UDPAddr)
  1366. if ua.Port == 0 {
  1367. return false
  1368. }
  1369. if ip4 := ua.IP.To4(); ip4 != nil && ip4[0] == 0 {
  1370. // Why?
  1371. return false
  1372. }
  1373. return true
  1374. }
  1375. // func (s *Server) refreshBucket(bucketIndex int) {
  1376. // targetId := s.table.randomIdForBucket(bucketIndex)
  1377. // }