transaction.go 848 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package dht
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/anacrolix/dht/v2/krpc"
  8. )
  9. var TransactionTimeout = errors.New("transaction timed out")
  10. // Transaction keeps track of a message exchange between nodes, such as a
  11. // query message and a response message.
  12. type Transaction struct {
  13. onResponse func(krpc.Msg)
  14. }
  15. func (t *Transaction) handleResponse(m krpc.Msg) {
  16. t.onResponse(m)
  17. }
  18. const defaultMaxQuerySends = 1
  19. func transactionSender(
  20. ctx context.Context,
  21. send func() error,
  22. resendDelay func() time.Duration,
  23. maxSends int,
  24. ) error {
  25. var delay time.Duration
  26. sends := 0
  27. for sends < maxSends {
  28. select {
  29. case <-time.After(delay):
  30. err := send()
  31. sends++
  32. if err != nil {
  33. return fmt.Errorf("send %d: %w", sends, err)
  34. }
  35. delay = resendDelay()
  36. case <-ctx.Done():
  37. return ctx.Err()
  38. }
  39. }
  40. return nil
  41. }