dispatcher.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package udp
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. )
  8. // Maintains a mapping of transaction IDs to handlers.
  9. type Dispatcher struct {
  10. mu sync.RWMutex
  11. transactions map[TransactionId]Transaction
  12. }
  13. // The caller owns b.
  14. func (me *Dispatcher) Dispatch(b []byte, addr net.Addr) error {
  15. buf := bytes.NewBuffer(b)
  16. var rh ResponseHeader
  17. err := Read(buf, &rh)
  18. if err != nil {
  19. return err
  20. }
  21. me.mu.RLock()
  22. defer me.mu.RUnlock()
  23. if t, ok := me.transactions[rh.TransactionId]; ok {
  24. t.h(DispatchedResponse{
  25. Header: rh,
  26. Body: append([]byte(nil), buf.Bytes()...),
  27. Addr: addr,
  28. })
  29. return nil
  30. } else {
  31. return fmt.Errorf("unknown transaction id %v", rh.TransactionId)
  32. }
  33. }
  34. func (me *Dispatcher) forgetTransaction(id TransactionId) {
  35. me.mu.Lock()
  36. defer me.mu.Unlock()
  37. delete(me.transactions, id)
  38. }
  39. func (me *Dispatcher) NewTransaction(h TransactionResponseHandler) Transaction {
  40. me.mu.Lock()
  41. defer me.mu.Unlock()
  42. for {
  43. id := RandomTransactionId()
  44. if _, ok := me.transactions[id]; ok {
  45. continue
  46. }
  47. t := Transaction{
  48. d: me,
  49. h: h,
  50. id: id,
  51. }
  52. if me.transactions == nil {
  53. me.transactions = make(map[TransactionId]Transaction)
  54. }
  55. me.transactions[id] = t
  56. return t
  57. }
  58. }
  59. type DispatchedResponse struct {
  60. Header ResponseHeader
  61. // Response payload, after the header.
  62. Body []byte
  63. // Response source address
  64. Addr net.Addr
  65. }