| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- package udp
- import (
- "bytes"
- "fmt"
- "net"
- "sync"
- )
- // Maintains a mapping of transaction IDs to handlers.
- type Dispatcher struct {
- mu sync.RWMutex
- transactions map[TransactionId]Transaction
- }
- // The caller owns b.
- func (me *Dispatcher) Dispatch(b []byte, addr net.Addr) error {
- buf := bytes.NewBuffer(b)
- var rh ResponseHeader
- err := Read(buf, &rh)
- if err != nil {
- return err
- }
- me.mu.RLock()
- defer me.mu.RUnlock()
- if t, ok := me.transactions[rh.TransactionId]; ok {
- t.h(DispatchedResponse{
- Header: rh,
- Body: append([]byte(nil), buf.Bytes()...),
- Addr: addr,
- })
- return nil
- } else {
- return fmt.Errorf("unknown transaction id %v", rh.TransactionId)
- }
- }
- func (me *Dispatcher) forgetTransaction(id TransactionId) {
- me.mu.Lock()
- defer me.mu.Unlock()
- delete(me.transactions, id)
- }
- func (me *Dispatcher) NewTransaction(h TransactionResponseHandler) Transaction {
- me.mu.Lock()
- defer me.mu.Unlock()
- for {
- id := RandomTransactionId()
- if _, ok := me.transactions[id]; ok {
- continue
- }
- t := Transaction{
- d: me,
- h: h,
- id: id,
- }
- if me.transactions == nil {
- me.transactions = make(map[TransactionId]Transaction)
- }
- me.transactions[id] = t
- return t
- }
- }
- type DispatchedResponse struct {
- Header ResponseHeader
- // Response payload, after the header.
- Body []byte
- // Response source address
- Addr net.Addr
- }
|