agent.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package stun
  2. import (
  3. "errors"
  4. "sync"
  5. "time"
  6. )
  7. // NoopHandler just discards any event.
  8. var NoopHandler Handler = func(e Event) {}
  9. // NewAgent initializes and returns new Agent with provided handler.
  10. // If h is nil, the NoopHandler will be used.
  11. func NewAgent(h Handler) *Agent {
  12. if h == nil {
  13. h = NoopHandler
  14. }
  15. a := &Agent{
  16. transactions: make(map[transactionID]agentTransaction),
  17. handler: h,
  18. }
  19. return a
  20. }
  21. // Agent is low-level abstraction over transaction list that
  22. // handles concurrency (all calls are goroutine-safe) and
  23. // time outs (via Collect call).
  24. type Agent struct {
  25. // transactions is map of transactions that are currently
  26. // in progress. Event handling is done in such way when
  27. // transaction is unregistered before agentTransaction access,
  28. // minimizing mux lock and protecting agentTransaction from
  29. // data races via unexpected concurrent access.
  30. transactions map[transactionID]agentTransaction
  31. closed bool // all calls are invalid if true
  32. mux sync.Mutex // protects transactions and closed
  33. handler Handler // handles transactions
  34. }
  35. // Handler handles state changes of transaction.
  36. //
  37. // Handler is called on transaction state change.
  38. // Usage of e is valid only during call, user must
  39. // copy needed fields explicitly.
  40. type Handler func(e Event)
  41. // Event is passed to Handler describing the transaction event.
  42. // Do not reuse outside Handler.
  43. type Event struct {
  44. TransactionID [TransactionIDSize]byte
  45. Message *Message
  46. Error error
  47. }
  48. // agentTransaction represents transaction in progress.
  49. // Concurrent access is invalid.
  50. type agentTransaction struct {
  51. id transactionID
  52. deadline time.Time
  53. }
  54. var (
  55. // ErrTransactionStopped indicates that transaction was manually stopped.
  56. ErrTransactionStopped = errors.New("transaction is stopped")
  57. // ErrTransactionNotExists indicates that agent failed to find transaction.
  58. ErrTransactionNotExists = errors.New("transaction not exists")
  59. // ErrTransactionExists indicates that transaction with same id is already
  60. // registered.
  61. ErrTransactionExists = errors.New("transaction exists with same id")
  62. )
  63. // StopWithError removes transaction from list and calls handler with
  64. // provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
  65. func (a *Agent) StopWithError(id [TransactionIDSize]byte, err error) error {
  66. a.mux.Lock()
  67. if a.closed {
  68. a.mux.Unlock()
  69. return ErrAgentClosed
  70. }
  71. t, exists := a.transactions[id]
  72. delete(a.transactions, id)
  73. h := a.handler
  74. a.mux.Unlock()
  75. if !exists {
  76. return ErrTransactionNotExists
  77. }
  78. h(Event{
  79. TransactionID: t.id,
  80. Error: err,
  81. })
  82. return nil
  83. }
  84. // Stop stops transaction by id with ErrTransactionStopped, blocking
  85. // until handler returns.
  86. func (a *Agent) Stop(id [TransactionIDSize]byte) error {
  87. return a.StopWithError(id, ErrTransactionStopped)
  88. }
  89. // ErrAgentClosed indicates that agent is in closed state and is unable
  90. // to handle transactions.
  91. var ErrAgentClosed = errors.New("agent is closed")
  92. // Start registers transaction with provided id and deadline.
  93. // Could return ErrAgentClosed, ErrTransactionExists.
  94. //
  95. // Agent handler is guaranteed to be eventually called.
  96. func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time) error {
  97. a.mux.Lock()
  98. defer a.mux.Unlock()
  99. if a.closed {
  100. return ErrAgentClosed
  101. }
  102. _, exists := a.transactions[id]
  103. if exists {
  104. return ErrTransactionExists
  105. }
  106. a.transactions[id] = agentTransaction{
  107. id: id,
  108. deadline: deadline,
  109. }
  110. return nil
  111. }
  112. // agentCollectCap is initial capacity for Agent.Collect slices,
  113. // sufficient to make function zero-alloc in most cases.
  114. const agentCollectCap = 100
  115. // ErrTransactionTimeOut indicates that transaction has reached deadline.
  116. var ErrTransactionTimeOut = errors.New("transaction is timed out")
  117. // Collect terminates all transactions that have deadline before provided
  118. // time, blocking until all handlers will process ErrTransactionTimeOut.
  119. // Will return ErrAgentClosed if agent is already closed.
  120. //
  121. // It is safe to call Collect concurrently but makes no sense.
  122. func (a *Agent) Collect(gcTime time.Time) error {
  123. toRemove := make([]transactionID, 0, agentCollectCap)
  124. a.mux.Lock()
  125. if a.closed {
  126. // Doing nothing if agent is closed.
  127. // All transactions should be already closed
  128. // during Close() call.
  129. a.mux.Unlock()
  130. return ErrAgentClosed
  131. }
  132. // Adding all transactions with deadline before gcTime
  133. // to toCall and toRemove slices.
  134. // No allocs if there are less than agentCollectCap
  135. // timed out transactions.
  136. for id, t := range a.transactions {
  137. if t.deadline.Before(gcTime) {
  138. toRemove = append(toRemove, id)
  139. }
  140. }
  141. // Un-registering timed out transactions.
  142. for _, id := range toRemove {
  143. delete(a.transactions, id)
  144. }
  145. // Calling handler does not require locked mutex,
  146. // reducing lock time.
  147. h := a.handler
  148. a.mux.Unlock()
  149. // Sending ErrTransactionTimeOut to handler for all transactions,
  150. // blocking until last one.
  151. event := Event{
  152. Error: ErrTransactionTimeOut,
  153. }
  154. for _, id := range toRemove {
  155. event.TransactionID = id
  156. h(event)
  157. }
  158. return nil
  159. }
  160. // Process incoming message, synchronously passing it to handler.
  161. func (a *Agent) Process(m *Message) error {
  162. e := Event{
  163. TransactionID: m.TransactionID,
  164. Message: m,
  165. }
  166. a.mux.Lock()
  167. if a.closed {
  168. a.mux.Unlock()
  169. return ErrAgentClosed
  170. }
  171. h := a.handler
  172. delete(a.transactions, m.TransactionID)
  173. a.mux.Unlock()
  174. h(e)
  175. return nil
  176. }
  177. // SetHandler sets agent handler to h.
  178. func (a *Agent) SetHandler(h Handler) error {
  179. a.mux.Lock()
  180. if a.closed {
  181. a.mux.Unlock()
  182. return ErrAgentClosed
  183. }
  184. a.handler = h
  185. a.mux.Unlock()
  186. return nil
  187. }
  188. // Close terminates all transactions with ErrAgentClosed and renders Agent to
  189. // closed state.
  190. func (a *Agent) Close() error {
  191. e := Event{
  192. Error: ErrAgentClosed,
  193. }
  194. a.mux.Lock()
  195. if a.closed {
  196. a.mux.Unlock()
  197. return ErrAgentClosed
  198. }
  199. for _, t := range a.transactions {
  200. e.TransactionID = t.id
  201. a.handler(e)
  202. }
  203. a.transactions = nil
  204. a.closed = true
  205. a.handler = nil
  206. a.mux.Unlock()
  207. return nil
  208. }
  209. type transactionID [TransactionIDSize]byte