conn.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package vnet
  2. import (
  3. "errors"
  4. "io"
  5. "math"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. maxReadQueueSize = 1024
  12. )
  13. var (
  14. errObsCannotBeNil = errors.New("obs cannot be nil")
  15. errUseClosedNetworkConn = errors.New("use of closed network connection")
  16. errAddrNotUDPAddr = errors.New("addr is not a net.UDPAddr")
  17. errLocAddr = errors.New("something went wrong with locAddr")
  18. errAlreadyClosed = errors.New("already closed")
  19. errNoRemAddr = errors.New("no remAddr defined")
  20. )
  21. // UDPPacketConn is packet-oriented connection for UDP.
  22. type UDPPacketConn interface {
  23. net.PacketConn
  24. Read(b []byte) (int, error)
  25. RemoteAddr() net.Addr
  26. Write(b []byte) (int, error)
  27. }
  28. // vNet implements this
  29. type connObserver interface {
  30. write(c Chunk) error
  31. onClosed(addr net.Addr)
  32. determineSourceIP(locIP, dstIP net.IP) net.IP
  33. }
  34. // UDPConn is the implementation of the Conn and PacketConn interfaces for UDP network connections.
  35. // comatible with net.PacketConn and net.Conn
  36. type UDPConn struct {
  37. locAddr *net.UDPAddr // read-only
  38. remAddr *net.UDPAddr // read-only
  39. obs connObserver // read-only
  40. readCh chan Chunk // thread-safe
  41. closed bool // requires mutex
  42. mu sync.Mutex // to mutex closed flag
  43. readTimer *time.Timer // thread-safe
  44. }
  45. func newUDPConn(locAddr, remAddr *net.UDPAddr, obs connObserver) (*UDPConn, error) {
  46. if obs == nil {
  47. return nil, errObsCannotBeNil
  48. }
  49. return &UDPConn{
  50. locAddr: locAddr,
  51. remAddr: remAddr,
  52. obs: obs,
  53. readCh: make(chan Chunk, maxReadQueueSize),
  54. readTimer: time.NewTimer(time.Duration(math.MaxInt64)),
  55. }, nil
  56. }
  57. // ReadFrom reads a packet from the connection,
  58. // copying the payload into p. It returns the number of
  59. // bytes copied into p and the return address that
  60. // was on the packet.
  61. // It returns the number of bytes read (0 <= n <= len(p))
  62. // and any error encountered. Callers should always process
  63. // the n > 0 bytes returned before considering the error err.
  64. // ReadFrom can be made to time out and return
  65. // an Error with Timeout() == true after a fixed time limit;
  66. // see SetDeadline and SetReadDeadline.
  67. func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  68. loop:
  69. for {
  70. select {
  71. case chunk, ok := <-c.readCh:
  72. if !ok {
  73. break loop
  74. }
  75. var err error
  76. n := copy(p, chunk.UserData())
  77. addr := chunk.SourceAddr()
  78. if n < len(chunk.UserData()) {
  79. err = io.ErrShortBuffer
  80. }
  81. if c.remAddr != nil {
  82. if addr.String() != c.remAddr.String() {
  83. break // discard (shouldn't happen)
  84. }
  85. }
  86. return n, addr, err
  87. case <-c.readTimer.C:
  88. return 0, nil, &net.OpError{
  89. Op: "read",
  90. Net: c.locAddr.Network(),
  91. Addr: c.locAddr,
  92. Err: newTimeoutError("i/o timeout"),
  93. }
  94. }
  95. }
  96. return 0, nil, &net.OpError{
  97. Op: "read",
  98. Net: c.locAddr.Network(),
  99. Addr: c.locAddr,
  100. Err: errUseClosedNetworkConn,
  101. }
  102. }
  103. // WriteTo writes a packet with payload p to addr.
  104. // WriteTo can be made to time out and return
  105. // an Error with Timeout() == true after a fixed time limit;
  106. // see SetDeadline and SetWriteDeadline.
  107. // On packet-oriented connections, write timeouts are rare.
  108. func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
  109. dstAddr, ok := addr.(*net.UDPAddr)
  110. if !ok {
  111. return 0, errAddrNotUDPAddr
  112. }
  113. srcIP := c.obs.determineSourceIP(c.locAddr.IP, dstAddr.IP)
  114. if srcIP == nil {
  115. return 0, errLocAddr
  116. }
  117. srcAddr := &net.UDPAddr{
  118. IP: srcIP,
  119. Port: c.locAddr.Port,
  120. }
  121. chunk := newChunkUDP(srcAddr, dstAddr)
  122. chunk.userData = make([]byte, len(p))
  123. copy(chunk.userData, p)
  124. if err := c.obs.write(chunk); err != nil {
  125. return 0, err
  126. }
  127. return len(p), nil
  128. }
  129. // Close closes the connection.
  130. // Any blocked ReadFrom or WriteTo operations will be unblocked and return errors.
  131. func (c *UDPConn) Close() error {
  132. c.mu.Lock()
  133. defer c.mu.Unlock()
  134. if c.closed {
  135. return errAlreadyClosed
  136. }
  137. c.closed = true
  138. close(c.readCh)
  139. c.obs.onClosed(c.locAddr)
  140. return nil
  141. }
  142. // LocalAddr returns the local network address.
  143. func (c *UDPConn) LocalAddr() net.Addr {
  144. return c.locAddr
  145. }
  146. // SetDeadline sets the read and write deadlines associated
  147. // with the connection. It is equivalent to calling both
  148. // SetReadDeadline and SetWriteDeadline.
  149. //
  150. // A deadline is an absolute time after which I/O operations
  151. // fail with a timeout (see type Error) instead of
  152. // blocking. The deadline applies to all future and pending
  153. // I/O, not just the immediately following call to ReadFrom or
  154. // WriteTo. After a deadline has been exceeded, the connection
  155. // can be refreshed by setting a deadline in the future.
  156. //
  157. // An idle timeout can be implemented by repeatedly extending
  158. // the deadline after successful ReadFrom or WriteTo calls.
  159. //
  160. // A zero value for t means I/O operations will not time out.
  161. func (c *UDPConn) SetDeadline(t time.Time) error {
  162. return c.SetReadDeadline(t)
  163. }
  164. // SetReadDeadline sets the deadline for future ReadFrom calls
  165. // and any currently-blocked ReadFrom call.
  166. // A zero value for t means ReadFrom will not time out.
  167. func (c *UDPConn) SetReadDeadline(t time.Time) error {
  168. var d time.Duration
  169. var noDeadline time.Time
  170. if t == noDeadline {
  171. d = time.Duration(math.MaxInt64)
  172. } else {
  173. d = time.Until(t)
  174. }
  175. c.readTimer.Reset(d)
  176. return nil
  177. }
  178. // SetWriteDeadline sets the deadline for future WriteTo calls
  179. // and any currently-blocked WriteTo call.
  180. // Even if write times out, it may return n > 0, indicating that
  181. // some of the data was successfully written.
  182. // A zero value for t means WriteTo will not time out.
  183. func (c *UDPConn) SetWriteDeadline(t time.Time) error {
  184. // Write never blocks.
  185. return nil
  186. }
  187. // Read reads data from the connection.
  188. // Read can be made to time out and return an Error with Timeout() == true
  189. // after a fixed time limit; see SetDeadline and SetReadDeadline.
  190. func (c *UDPConn) Read(b []byte) (int, error) {
  191. n, _, err := c.ReadFrom(b)
  192. return n, err
  193. }
  194. // RemoteAddr returns the remote network address.
  195. func (c *UDPConn) RemoteAddr() net.Addr {
  196. return c.remAddr
  197. }
  198. // Write writes data to the connection.
  199. // Write can be made to time out and return an Error with Timeout() == true
  200. // after a fixed time limit; see SetDeadline and SetWriteDeadline.
  201. func (c *UDPConn) Write(b []byte) (int, error) {
  202. if c.remAddr == nil {
  203. return 0, errNoRemAddr
  204. }
  205. return c.WriteTo(b, c.remAddr)
  206. }
  207. func (c *UDPConn) onInboundChunk(chunk Chunk) {
  208. c.mu.Lock()
  209. defer c.mu.Unlock()
  210. if c.closed {
  211. return
  212. }
  213. select {
  214. case c.readCh <- chunk:
  215. default:
  216. }
  217. }