operation.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package traversal
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "github.com/anacrolix/chansync"
  6. "github.com/anacrolix/chansync/events"
  7. "github.com/anacrolix/sync"
  8. "github.com/anacrolix/dht/v2/containers"
  9. "github.com/anacrolix/dht/v2/int160"
  10. k_nearest_nodes "github.com/anacrolix/dht/v2/k-nearest-nodes"
  11. "github.com/anacrolix/dht/v2/krpc"
  12. "github.com/anacrolix/dht/v2/types"
  13. )
  14. type QueryResult struct {
  15. // A node that should be considered for a closest entry.
  16. ResponseFrom *krpc.NodeInfo
  17. // Data associated with a closest node. Is this ever not a string? I think using generics for
  18. // this leaks throughout the entire Operation. Hardly worth it. It's still possible to handle
  19. // invalid token types at runtime.
  20. ClosestData interface{}
  21. Nodes []krpc.NodeInfo
  22. Nodes6 []krpc.NodeInfo
  23. }
  24. type OperationInput struct {
  25. Target krpc.ID
  26. Alpha int
  27. K int
  28. DoQuery func(context.Context, krpc.NodeAddr) QueryResult
  29. NodeFilter func(types.AddrMaybeId) bool
  30. }
  31. type defaultsAppliedOperationInput OperationInput
  32. func Start(input OperationInput) *Operation {
  33. herp := defaultsAppliedOperationInput(input)
  34. if herp.Alpha == 0 {
  35. herp.Alpha = 3
  36. }
  37. if herp.K == 0 {
  38. herp.K = 8
  39. }
  40. if herp.NodeFilter == nil {
  41. herp.NodeFilter = func(types.AddrMaybeId) bool {
  42. return true
  43. }
  44. }
  45. targetInt160 := herp.Target.Int160()
  46. op := &Operation{
  47. targetInt160: targetInt160,
  48. input: herp,
  49. queried: make(map[addrString]struct{}),
  50. closest: k_nearest_nodes.New(targetInt160, herp.K),
  51. unqueried: containers.NewImmutableAddrMaybeIdsByDistance(targetInt160),
  52. }
  53. go op.run()
  54. return op
  55. }
  56. type addrString string
  57. type Operation struct {
  58. stats Stats
  59. mu sync.Mutex
  60. unqueried containers.AddrMaybeIdsByDistance
  61. queried map[addrString]struct{}
  62. closest k_nearest_nodes.Type
  63. targetInt160 int160.T
  64. input defaultsAppliedOperationInput
  65. outstanding int
  66. cond chansync.BroadcastCond
  67. stalled chansync.LevelTrigger
  68. stopping chansync.SetOnce
  69. stopped chansync.SetOnce
  70. }
  71. // I don't think you should access this until the Stopped event.
  72. func (op *Operation) Stats() *Stats {
  73. return &op.stats
  74. }
  75. func (op *Operation) Stop() {
  76. if op.stopping.Set() {
  77. go func() {
  78. defer op.stopped.Set()
  79. op.mu.Lock()
  80. defer op.mu.Unlock()
  81. for {
  82. if op.outstanding == 0 {
  83. break
  84. }
  85. cond := op.cond.Signaled()
  86. op.mu.Unlock()
  87. <-cond
  88. op.mu.Lock()
  89. }
  90. }()
  91. }
  92. }
  93. func (op *Operation) Stopped() events.Done {
  94. return op.stopped.Done()
  95. }
  96. func (op *Operation) Stalled() events.Active {
  97. return op.stalled.Active()
  98. }
  99. func (op *Operation) AddNodes(nodes []types.AddrMaybeId) (added int) {
  100. op.mu.Lock()
  101. defer op.mu.Unlock()
  102. before := op.unqueried.Len()
  103. for _, n := range nodes {
  104. if _, ok := op.queried[addrString(n.Addr.String())]; ok {
  105. continue
  106. }
  107. if !op.input.NodeFilter(n) {
  108. continue
  109. }
  110. op.unqueried = op.unqueried.Add(n)
  111. }
  112. op.cond.Broadcast()
  113. return op.unqueried.Len() - before
  114. }
  115. func (op *Operation) markQueried(addr krpc.NodeAddr) {
  116. op.queried[addrString(addr.String())] = struct{}{}
  117. }
  118. func (op *Operation) closestUnqueried() (ret types.AddrMaybeId) {
  119. return op.unqueried.Next()
  120. }
  121. func (op *Operation) popClosestUnqueried() types.AddrMaybeId {
  122. ret := op.closestUnqueried()
  123. op.unqueried = op.unqueried.Delete(ret)
  124. return ret
  125. }
  126. func (op *Operation) haveQuery() bool {
  127. if op.unqueried.Len() == 0 {
  128. return false
  129. }
  130. if !op.closest.Full() {
  131. return true
  132. }
  133. cu := op.closestUnqueried()
  134. if cu.Id == nil {
  135. return false
  136. }
  137. return cu.Id.Distance(op.targetInt160).Cmp(op.closest.Farthest().ID.Int160().Distance(op.targetInt160)) <= 0
  138. }
  139. func (op *Operation) run() {
  140. defer close(op.stalled.Signal())
  141. op.mu.Lock()
  142. defer op.mu.Unlock()
  143. for {
  144. if op.stopping.IsSet() {
  145. return
  146. }
  147. for op.outstanding < op.input.Alpha && op.haveQuery() {
  148. op.startQuery()
  149. }
  150. var stalled events.Signal
  151. if (!op.haveQuery() || op.input.Alpha == 0) && op.outstanding == 0 {
  152. stalled = op.stalled.Signal()
  153. }
  154. queryCondSignaled := op.cond.Signaled()
  155. op.mu.Unlock()
  156. select {
  157. case stalled <- struct{}{}:
  158. case <-op.stopping.Done():
  159. case <-queryCondSignaled:
  160. }
  161. op.mu.Lock()
  162. }
  163. }
  164. func (op *Operation) addClosest(node krpc.NodeInfo, data interface{}) {
  165. var ami types.AddrMaybeId
  166. ami.FromNodeInfo(node)
  167. if !op.input.NodeFilter(ami) {
  168. return
  169. }
  170. op.closest = op.closest.Push(k_nearest_nodes.Elem{
  171. Key: node,
  172. Data: data,
  173. })
  174. }
  175. func (op *Operation) Closest() *k_nearest_nodes.Type {
  176. return &op.closest
  177. }
  178. func (op *Operation) startQuery() {
  179. a := op.popClosestUnqueried()
  180. op.markQueried(a.Addr)
  181. op.outstanding++
  182. go func() {
  183. defer func() {
  184. op.mu.Lock()
  185. defer op.mu.Unlock()
  186. op.outstanding--
  187. op.cond.Broadcast()
  188. }()
  189. // log.Printf("traversal querying %v", a)
  190. atomic.AddUint32(&op.stats.NumAddrsTried, 1)
  191. ctx, cancel := context.WithCancel(context.Background())
  192. go func() {
  193. select {
  194. case <-ctx.Done():
  195. case <-op.stopping.Done():
  196. cancel()
  197. }
  198. }()
  199. res := op.input.DoQuery(ctx, a.Addr)
  200. cancel()
  201. if res.ResponseFrom != nil {
  202. func() {
  203. op.mu.Lock()
  204. defer op.mu.Unlock()
  205. atomic.AddUint32(&op.stats.NumResponses, 1)
  206. op.addClosest(*res.ResponseFrom, res.ClosestData)
  207. }()
  208. }
  209. op.AddNodes(types.AddrMaybeIdSliceFromNodeInfoSlice(res.Nodes))
  210. op.AddNodes(types.AddrMaybeIdSliceFromNodeInfoSlice(res.Nodes6))
  211. }()
  212. }