socket.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. package utp
  2. /*
  3. #include "utp.h"
  4. #include <stdbool.h>
  5. struct utp_process_udp_args {
  6. const byte *buf;
  7. size_t len;
  8. const struct sockaddr *sa;
  9. socklen_t sal;
  10. };
  11. void process_received_messages(utp_context *ctx, struct utp_process_udp_args *args, size_t argslen)
  12. {
  13. bool gotUtp = false;
  14. size_t i;
  15. for (i = 0; i < argslen; i++) {
  16. struct utp_process_udp_args *a = &args[i];
  17. //if (!a->len) continue;
  18. if (utp_process_udp(ctx, a->buf, a->len, a->sa, a->sal)) {
  19. gotUtp = true;
  20. }
  21. }
  22. if (gotUtp) {
  23. utp_issue_deferred_acks(ctx);
  24. utp_check_timeouts(ctx);
  25. }
  26. }
  27. */
  28. import "C"
  29. import (
  30. "context"
  31. "errors"
  32. "fmt"
  33. "math"
  34. "net"
  35. "syscall"
  36. "time"
  37. "unsafe"
  38. "github.com/anacrolix/log"
  39. "github.com/anacrolix/missinggo"
  40. "github.com/anacrolix/missinggo/inproc"
  41. "github.com/anacrolix/mmsg"
  42. )
  43. const (
  44. utpCheckTimeoutInterval = 500 * time.Millisecond
  45. issueDeferredUtpAcksDelay = 1000 * time.Microsecond
  46. )
  47. type Socket struct {
  48. pc net.PacketConn
  49. ctx *utpContext
  50. backlog chan *Conn
  51. closed bool
  52. conns map[*C.utp_socket]*Conn
  53. nonUtpReads chan packet
  54. writeDeadline time.Time
  55. readDeadline time.Time
  56. // This is called without the package mutex, without knowing if the result will be needed.
  57. asyncFirewallCallback FirewallCallback
  58. // Whether the next accept is to be blocked.
  59. asyncBlock bool
  60. // This is called with the package mutex, and preferred.
  61. syncFirewallCallback FirewallCallback
  62. acksScheduled bool
  63. ackTimer *time.Timer
  64. utpTimeoutChecker *time.Timer
  65. logger log.Logger
  66. }
  67. // A firewall callback returns true if an incoming connection request should be ignored. This is
  68. // better than just accepting and closing, as it means no acknowledgement packet is sent.
  69. type FirewallCallback func(net.Addr) bool
  70. var (
  71. _ net.PacketConn = (*Socket)(nil)
  72. _ net.Listener = (*Socket)(nil)
  73. errSocketClosed = errors.New("Socket closed")
  74. )
  75. type packet struct {
  76. b []byte
  77. from net.Addr
  78. }
  79. func listenPacket(network, addr string) (pc net.PacketConn, err error) {
  80. if network == "inproc" {
  81. return inproc.ListenPacket(network, addr)
  82. }
  83. return net.ListenPacket(network, addr)
  84. }
  85. type NewSocketOpt func(s *Socket)
  86. func WithLogger(l log.Logger) NewSocketOpt {
  87. return func(s *Socket) {
  88. s.logger = l
  89. }
  90. }
  91. func NewSocket(network, addr string, opts ...NewSocketOpt) (*Socket, error) {
  92. pc, err := listenPacket(network, addr)
  93. if err != nil {
  94. return nil, err
  95. }
  96. s := &Socket{
  97. pc: pc,
  98. backlog: make(chan *Conn, 5),
  99. conns: make(map[*C.utp_socket]*Conn),
  100. nonUtpReads: make(chan packet, 100),
  101. logger: Logger,
  102. }
  103. s.ackTimer = time.AfterFunc(math.MaxInt64, s.ackTimerFunc)
  104. s.ackTimer.Stop()
  105. for _, opt := range opts {
  106. opt(s)
  107. }
  108. func() {
  109. mu.Lock()
  110. defer mu.Unlock()
  111. ctx := (*utpContext)(C.utp_init(2))
  112. if ctx == nil {
  113. panic(ctx)
  114. }
  115. s.ctx = ctx
  116. ctx.setCallbacks()
  117. if utpLogging {
  118. ctx.setOption(C.UTP_LOG_NORMAL, 1)
  119. ctx.setOption(C.UTP_LOG_MTU, 1)
  120. ctx.setOption(C.UTP_LOG_DEBUG, 1)
  121. }
  122. libContextToSocket[ctx] = s
  123. s.utpTimeoutChecker = time.AfterFunc(0, s.timeoutCheckerTimerFunc)
  124. }()
  125. go s.packetReader()
  126. return s, nil
  127. }
  128. func (s *Socket) onLibSocketDestroyed(ls *C.utp_socket) {
  129. delete(s.conns, ls)
  130. }
  131. func (s *Socket) newConn(us *C.utp_socket) *Conn {
  132. c := &Conn{
  133. s: s,
  134. us: us,
  135. localAddr: s.pc.LocalAddr(),
  136. }
  137. c.cond.L = &mu
  138. s.conns[us] = c
  139. c.writeDeadlineTimer = time.AfterFunc(-1, c.cond.Broadcast)
  140. c.readDeadlineTimer = time.AfterFunc(-1, c.cond.Broadcast)
  141. return c
  142. }
  143. const maxNumBuffers = 16
  144. func (s *Socket) packetReader() {
  145. mc := mmsg.NewConn(s.pc)
  146. // Increasing the messages increases the memory use, but also means we can
  147. // reduces utp_issue_deferred_acks and syscalls which should improve
  148. // efficiency. On the flip side, not all OSs implement batched reads.
  149. ms := make([]mmsg.Message, func() int {
  150. if mc.Err() == nil {
  151. return maxNumBuffers
  152. } else {
  153. return 1
  154. }
  155. }())
  156. for i := range ms {
  157. // The IPv4 UDP limit is allegedly about 64 KiB, and this message has
  158. // been seen on receiving on Windows with just 0x1000: wsarecvfrom: A
  159. // message sent on a datagram socket was larger than the internal
  160. // message buffer or some other network limit, or the buffer used to
  161. // receive a datagram into was smaller than the datagram itself.
  162. ms[i].Buffers = [][]byte{make([]byte, 0x10000)}
  163. }
  164. // Some crap OSs like Windoze will raise errors in Reads that don't
  165. // actually mean we should stop.
  166. consecutiveErrors := 0
  167. for {
  168. // In C, all the reads are processed and when it threatens to block,
  169. // we're supposed to call utp_issue_deferred_acks.
  170. n, err := mc.RecvMsgs(ms)
  171. if n == 1 {
  172. singleMsgRecvs.Add(1)
  173. }
  174. if n > 1 {
  175. multiMsgRecvs.Add(1)
  176. }
  177. if err != nil {
  178. mu.Lock()
  179. closed := s.closed
  180. mu.Unlock()
  181. if closed {
  182. // We don't care.
  183. return
  184. }
  185. // See https://github.com/anacrolix/torrent/issues/83. If we get
  186. // an endless stream of errors (such as the PacketConn being
  187. // Closed outside of our control, this work around may need to be
  188. // reconsidered.
  189. s.logger.Printf("ignoring socket read error: %s", err)
  190. consecutiveErrors++
  191. if consecutiveErrors >= 100 {
  192. s.logger.Print("too many consecutive errors, closing socket")
  193. s.Close()
  194. return
  195. }
  196. continue
  197. }
  198. consecutiveErrors = 0
  199. expMap.Add("successful mmsg receive calls", 1)
  200. expMap.Add("received messages", int64(n))
  201. s.processReceivedMessages(ms[:n])
  202. }
  203. }
  204. func (s *Socket) processReceivedMessages(ms []mmsg.Message) {
  205. mu.Lock()
  206. defer mu.Unlock()
  207. if s.closed {
  208. return
  209. }
  210. if processPacketsInC {
  211. var args [maxNumBuffers]C.struct_utp_process_udp_args
  212. for i, m := range ms {
  213. a := &args[i]
  214. a.buf = (*C.byte)(&m.Buffers[0][0])
  215. a.len = C.size_t(m.N)
  216. var rsa syscall.RawSockaddrAny
  217. rsa, a.sal = netAddrToLibSockaddr(m.Addr)
  218. a.sa = (*C.struct_sockaddr)(unsafe.Pointer(&rsa))
  219. }
  220. C.process_received_messages(s.ctx.asCPtr(), &args[0], C.size_t(len(ms)))
  221. } else {
  222. gotUtp := false
  223. for _, m := range ms {
  224. gotUtp = s.processReceivedMessage(m.Buffers[0][:m.N], m.Addr) || gotUtp
  225. }
  226. if gotUtp && !s.closed {
  227. s.afterReceivingUtpMessages()
  228. }
  229. }
  230. }
  231. func (s *Socket) afterReceivingUtpMessages() {
  232. if s.acksScheduled {
  233. return
  234. }
  235. s.ackTimer.Reset(issueDeferredUtpAcksDelay)
  236. s.acksScheduled = true
  237. }
  238. func (s *Socket) issueDeferredAcks() {
  239. expMap.Add("utp_issue_deferred_acks calls", 1)
  240. C.utp_issue_deferred_acks(s.ctx.asCPtr())
  241. }
  242. func (s *Socket) checkUtpTimeouts() {
  243. expMap.Add("utp_check_timeouts calls", 1)
  244. C.utp_check_timeouts(s.ctx.asCPtr())
  245. }
  246. func (s *Socket) ackTimerFunc() {
  247. mu.Lock()
  248. defer mu.Unlock()
  249. if !s.acksScheduled || s.ctx == nil {
  250. return
  251. }
  252. s.acksScheduled = false
  253. s.issueDeferredAcks()
  254. }
  255. func (s *Socket) processReceivedMessage(b []byte, addr net.Addr) (utp bool) {
  256. if s.utpProcessUdp(b, addr) {
  257. socketUtpPacketsReceived.Add(1)
  258. return true
  259. } else {
  260. s.onReadNonUtp(b, addr)
  261. return false
  262. }
  263. }
  264. // Process packet batches entirely from C, reducing CGO overhead. Currently
  265. // requires GODEBUG=cgocheck=0.
  266. const processPacketsInC = false
  267. var staticRsa syscall.RawSockaddrAny
  268. // Wraps libutp's utp_process_udp, returning relevant information.
  269. func (s *Socket) utpProcessUdp(b []byte, addr net.Addr) (utp bool) {
  270. if len(b) == 0 {
  271. // The implementation of utp_process_udp rejects null buffers, and
  272. // anything smaller than the UTP header size. It's also prone to
  273. // assert on those, which we don't want to trigger.
  274. return false
  275. }
  276. if missinggo.AddrPort(addr) == 0 {
  277. return false
  278. }
  279. mu.Unlock()
  280. // TODO: If it's okay to call the firewall callback without the package lock, aren't we assuming
  281. // that the next UDP packet to be processed by libutp has to be the one we've just used the
  282. // callback for? Why can't we assign directly to Socket.asyncBlock?
  283. asyncBlock := func() bool {
  284. if s.asyncFirewallCallback == nil || s.syncFirewallCallback != nil {
  285. return false
  286. }
  287. return s.asyncFirewallCallback(addr)
  288. }()
  289. mu.Lock()
  290. s.asyncBlock = asyncBlock
  291. if s.closed {
  292. return false
  293. }
  294. var sal C.socklen_t
  295. staticRsa, sal = netAddrToLibSockaddr(addr)
  296. ret := C.utp_process_udp(s.ctx.asCPtr(), (*C.byte)(&b[0]), C.size_t(len(b)), (*C.struct_sockaddr)(unsafe.Pointer(&staticRsa)), sal)
  297. switch ret {
  298. case 1:
  299. return true
  300. case 0:
  301. return false
  302. default:
  303. panic(ret)
  304. }
  305. }
  306. func (s *Socket) timeoutCheckerTimerFunc() {
  307. mu.Lock()
  308. ok := s.ctx != nil
  309. if ok {
  310. s.checkUtpTimeouts()
  311. }
  312. if ok {
  313. s.utpTimeoutChecker.Reset(utpCheckTimeoutInterval)
  314. }
  315. mu.Unlock()
  316. }
  317. func (s *Socket) Close() error {
  318. mu.Lock()
  319. defer mu.Unlock()
  320. return s.closeLocked()
  321. }
  322. func (s *Socket) closeLocked() error {
  323. if s.closed {
  324. return nil
  325. }
  326. // Calling this deletes the pointer. It must not be referred to after
  327. // this.
  328. C.utp_destroy(s.ctx.asCPtr())
  329. s.ctx = nil
  330. s.pc.Close()
  331. close(s.backlog)
  332. close(s.nonUtpReads)
  333. s.closed = true
  334. s.ackTimer.Stop()
  335. s.utpTimeoutChecker.Stop()
  336. s.acksScheduled = false
  337. return nil
  338. }
  339. func (s *Socket) Addr() net.Addr {
  340. return s.pc.LocalAddr()
  341. }
  342. func (s *Socket) LocalAddr() net.Addr {
  343. return s.pc.LocalAddr()
  344. }
  345. func (s *Socket) Accept() (net.Conn, error) {
  346. nc, ok := <-s.backlog
  347. if !ok {
  348. return nil, errors.New("closed")
  349. }
  350. return nc, nil
  351. }
  352. func (s *Socket) Dial(addr string) (net.Conn, error) {
  353. return s.DialTimeout(addr, 0)
  354. }
  355. func (s *Socket) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  356. ctx := context.Background()
  357. if timeout != 0 {
  358. var cancel context.CancelFunc
  359. ctx, cancel = context.WithTimeout(ctx, timeout)
  360. defer cancel()
  361. }
  362. return s.DialContext(ctx, "", addr)
  363. }
  364. func (s *Socket) resolveAddr(network, addr string) (net.Addr, error) {
  365. if network == "" {
  366. network = s.Addr().Network()
  367. }
  368. return resolveAddr(network, addr)
  369. }
  370. func resolveAddr(network, addr string) (net.Addr, error) {
  371. switch network {
  372. case "inproc":
  373. return inproc.ResolveAddr(network, addr)
  374. default:
  375. return net.ResolveUDPAddr(network, addr)
  376. }
  377. }
  378. // Passing an empty network will use the network of the Socket's listener.
  379. func (s *Socket) DialContext(ctx context.Context, network, addr string) (_ net.Conn, err error) {
  380. if network == "" {
  381. network = s.pc.LocalAddr().Network()
  382. }
  383. ua, err := resolveAddr(network, addr)
  384. if err != nil {
  385. return nil, fmt.Errorf("error resolving address: %v", err)
  386. }
  387. sa, sl := netAddrToLibSockaddr(ua)
  388. mu.Lock()
  389. defer mu.Unlock()
  390. if s.closed {
  391. return nil, errSocketClosed
  392. }
  393. utpSock := utpCreateSocketAndConnect(s.ctx.asCPtr(), sa, sl)
  394. c := s.newConn(utpSock)
  395. c.setRemoteAddr()
  396. err = c.waitForConnect(ctx)
  397. if err != nil {
  398. c.close()
  399. return
  400. }
  401. return c, err
  402. }
  403. func (s *Socket) pushBacklog(c *Conn) {
  404. select {
  405. case s.backlog <- c:
  406. default:
  407. c.close()
  408. }
  409. }
  410. func (s *Socket) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
  411. p, ok := <-s.nonUtpReads
  412. if !ok {
  413. err = errors.New("closed")
  414. return
  415. }
  416. n = copy(b, p.b)
  417. addr = p.from
  418. return
  419. }
  420. func (s *Socket) onReadNonUtp(b []byte, from net.Addr) {
  421. if s.closed {
  422. return
  423. }
  424. socketNonUtpPacketsReceived.Add(1)
  425. select {
  426. case s.nonUtpReads <- packet{append([]byte(nil), b...), from}:
  427. default:
  428. // log.Printf("dropped non utp packet: no room in buffer")
  429. nonUtpPacketsDropped.Add(1)
  430. }
  431. }
  432. func (s *Socket) SetReadDeadline(t time.Time) error {
  433. panic("not implemented")
  434. }
  435. func (s *Socket) SetWriteDeadline(t time.Time) error {
  436. panic("not implemented")
  437. }
  438. func (s *Socket) SetDeadline(t time.Time) error {
  439. panic("not implemented")
  440. }
  441. func (s *Socket) WriteTo(b []byte, addr net.Addr) (int, error) {
  442. return s.pc.WriteTo(b, addr)
  443. }
  444. func (s *Socket) ReadBufferLen() int {
  445. mu.Lock()
  446. defer mu.Unlock()
  447. return int(C.utp_context_get_option(s.ctx.asCPtr(), C.UTP_RCVBUF))
  448. }
  449. func (s *Socket) WriteBufferLen() int {
  450. mu.Lock()
  451. defer mu.Unlock()
  452. return int(C.utp_context_get_option(s.ctx.asCPtr(), C.UTP_SNDBUF))
  453. }
  454. func (s *Socket) SetWriteBufferLen(len int) {
  455. mu.Lock()
  456. defer mu.Unlock()
  457. i := C.utp_context_set_option(s.ctx.asCPtr(), C.UTP_SNDBUF, C.int(len))
  458. if i != 0 {
  459. panic(i)
  460. }
  461. }
  462. func (s *Socket) SetOption(opt Option, val int) int {
  463. mu.Lock()
  464. defer mu.Unlock()
  465. return int(C.utp_context_set_option(s.ctx.asCPtr(), opt, C.int(val)))
  466. }
  467. // The callback is used before each packet is processed by libutp without the this package's mutex
  468. // being held. libutp may not actually need the result as the packet might not be a connection
  469. // attempt. If the callback function is expensive, it may be worth setting a synchronous callback
  470. // using SetSyncFirewallCallback.
  471. func (s *Socket) SetFirewallCallback(f FirewallCallback) {
  472. mu.Lock()
  473. s.asyncFirewallCallback = f
  474. mu.Unlock()
  475. }
  476. // SetSyncFirewallCallback sets a synchronous firewall callback. It's only called as needed by
  477. // libutp. It is called with the package-wide mutex held. Any locks acquired by the callback should
  478. // not also be held by code that might use this package.
  479. func (s *Socket) SetSyncFirewallCallback(f FirewallCallback) {
  480. mu.Lock()
  481. s.syncFirewallCallback = f
  482. mu.Unlock()
  483. }