send.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package utp
  2. import (
  3. "log"
  4. "time"
  5. "github.com/anacrolix/missinggo"
  6. )
  7. type send struct {
  8. acked missinggo.Event
  9. payloadSize uint32
  10. started missinggo.MonotonicTime
  11. _type st
  12. connID uint16
  13. payload []byte
  14. seqNr uint16
  15. conn *Conn
  16. acksSkipped int
  17. resendTimer *time.Timer
  18. numResends int
  19. }
  20. // first is true if this is the first time the send is acked. latency is
  21. // calculated for the first ack.
  22. func (s *send) Ack() (latency time.Duration, first bool) {
  23. first = !s.acked.IsSet()
  24. if first {
  25. latency = missinggo.MonotonicSince(s.started)
  26. }
  27. if s.payload != nil {
  28. sendBufferPool.Put(s.payload[:0:minMTU])
  29. s.payload = nil
  30. }
  31. s.acked.Set()
  32. if s.resendTimer != nil {
  33. s.resendTimer.Stop()
  34. s.resendTimer = nil
  35. }
  36. return
  37. }
  38. func (s *send) timedOut() {
  39. s.conn.destroy(errAckTimeout)
  40. }
  41. func (s *send) timeoutResend() {
  42. mu.Lock()
  43. defer mu.Unlock()
  44. if missinggo.MonotonicSince(s.started) >= writeTimeout {
  45. s.timedOut()
  46. return
  47. }
  48. if s.acked.IsSet() || s.conn.destroyed.IsSet() {
  49. return
  50. }
  51. rt := s.conn.resendTimeout()
  52. s.resend()
  53. s.numResends++
  54. s.resendTimer.Reset(rt * time.Duration(s.numResends))
  55. }
  56. func (s *send) resend() {
  57. if s.acked.IsSet() {
  58. return
  59. }
  60. err := s.conn.send(s._type, s.connID, s.payload, s.seqNr)
  61. if err != nil {
  62. log.Printf("error resending packet: %s", err)
  63. }
  64. }