transaction.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package client
  2. import (
  3. "net"
  4. "sync"
  5. "time"
  6. "github.com/pion/stun"
  7. )
  8. const (
  9. maxRtxInterval time.Duration = 1600 * time.Millisecond
  10. )
  11. // TransactionResult is a bag of result values of a transaction
  12. type TransactionResult struct {
  13. Msg *stun.Message
  14. From net.Addr
  15. Retries int
  16. Err error
  17. }
  18. // TransactionConfig is a set of config params used by NewTransaction
  19. type TransactionConfig struct {
  20. Key string
  21. Raw []byte
  22. To net.Addr
  23. Interval time.Duration
  24. IgnoreResult bool // true to throw away the result of this transaction (it will not be readable using WaitForResult)
  25. }
  26. // Transaction represents a transaction
  27. type Transaction struct {
  28. Key string // read-only
  29. Raw []byte // read-only
  30. To net.Addr // read-only
  31. nRtx int // modified only by the timer thread
  32. interval time.Duration // modified only by the timer thread
  33. timer *time.Timer // thread-safe, set only by the creator, and stopper
  34. resultCh chan TransactionResult // thread-safe
  35. mutex sync.RWMutex
  36. }
  37. // NewTransaction creates a new instance of Transaction
  38. func NewTransaction(config *TransactionConfig) *Transaction {
  39. var resultCh chan TransactionResult
  40. if !config.IgnoreResult {
  41. resultCh = make(chan TransactionResult)
  42. }
  43. return &Transaction{
  44. Key: config.Key, // read-only
  45. Raw: config.Raw, // read-only
  46. To: config.To, // read-only
  47. interval: config.Interval, // modified only by the timer thread
  48. resultCh: resultCh, // thread-safe
  49. }
  50. }
  51. // StartRtxTimer starts the transaction timer
  52. func (t *Transaction) StartRtxTimer(onTimeout func(trKey string, nRtx int)) {
  53. t.mutex.Lock()
  54. defer t.mutex.Unlock()
  55. t.timer = time.AfterFunc(t.interval, func() {
  56. t.mutex.Lock()
  57. t.nRtx++
  58. nRtx := t.nRtx
  59. t.interval *= 2
  60. if t.interval > maxRtxInterval {
  61. t.interval = maxRtxInterval
  62. }
  63. t.mutex.Unlock()
  64. onTimeout(t.Key, nRtx)
  65. })
  66. }
  67. // StopRtxTimer stop the transaction timer
  68. func (t *Transaction) StopRtxTimer() {
  69. t.mutex.Lock()
  70. defer t.mutex.Unlock()
  71. if t.timer != nil {
  72. t.timer.Stop()
  73. }
  74. }
  75. // WriteResult writes the result to the result channel
  76. func (t *Transaction) WriteResult(res TransactionResult) bool {
  77. if t.resultCh == nil {
  78. return false
  79. }
  80. t.resultCh <- res
  81. return true
  82. }
  83. // WaitForResult waits for the transaction result
  84. func (t *Transaction) WaitForResult() TransactionResult {
  85. if t.resultCh == nil {
  86. return TransactionResult{
  87. Err: errWaitForResultOnNonResultTransaction,
  88. }
  89. }
  90. result, ok := <-t.resultCh
  91. if !ok {
  92. result.Err = errTransactionClosed
  93. }
  94. return result
  95. }
  96. // Close closes the transaction
  97. func (t *Transaction) Close() {
  98. if t.resultCh != nil {
  99. close(t.resultCh)
  100. }
  101. }
  102. // Retries returns the number of retransmission it has made
  103. func (t *Transaction) Retries() int {
  104. t.mutex.RLock()
  105. defer t.mutex.RUnlock()
  106. return t.nRtx
  107. }
  108. // TransactionMap is a thread-safe transaction map
  109. type TransactionMap struct {
  110. trMap map[string]*Transaction
  111. mutex sync.RWMutex
  112. }
  113. // NewTransactionMap create a new instance of the transaction map
  114. func NewTransactionMap() *TransactionMap {
  115. return &TransactionMap{
  116. trMap: map[string]*Transaction{},
  117. }
  118. }
  119. // Insert inserts a trasaction to the map
  120. func (m *TransactionMap) Insert(key string, tr *Transaction) bool {
  121. m.mutex.Lock()
  122. defer m.mutex.Unlock()
  123. m.trMap[key] = tr
  124. return true
  125. }
  126. // Find looks up a transaction by its key
  127. func (m *TransactionMap) Find(key string) (*Transaction, bool) {
  128. m.mutex.RLock()
  129. defer m.mutex.RUnlock()
  130. tr, ok := m.trMap[key]
  131. return tr, ok
  132. }
  133. // Delete deletes a transaction by its key
  134. func (m *TransactionMap) Delete(key string) {
  135. m.mutex.Lock()
  136. defer m.mutex.Unlock()
  137. delete(m.trMap, key)
  138. }
  139. // CloseAndDeleteAll closes and deletes all transactions
  140. func (m *TransactionMap) CloseAndDeleteAll() {
  141. m.mutex.Lock()
  142. defer m.mutex.Unlock()
  143. for trKey, tr := range m.trMap {
  144. tr.Close()
  145. delete(m.trMap, trKey)
  146. }
  147. }
  148. // Size returns the length of the transaction map
  149. func (m *TransactionMap) Size() int {
  150. m.mutex.RLock()
  151. defer m.mutex.RUnlock()
  152. return len(m.trMap)
  153. }