conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. package utp
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net"
  8. "time"
  9. "github.com/anacrolix/missinggo"
  10. )
  11. // Conn is a uTP stream and implements net.Conn. It owned by a Socket, which
  12. // handles dispatching packets to and from Conns.
  13. type Conn struct {
  14. recv_id, send_id uint16
  15. seq_nr, ack_nr uint16
  16. lastAck uint16
  17. lastTimeDiff uint32
  18. peerWndSize uint32
  19. cur_window uint32
  20. connKey connKey
  21. // Data waiting to be Read.
  22. readBuf []byte
  23. readBufNotEmpty missinggo.Event
  24. socket *Socket
  25. remoteSocketAddr net.Addr
  26. // The uTP timestamp.
  27. startTimestamp uint32
  28. // When the conn was allocated.
  29. created time.Time
  30. synAcked bool // Syn is acked by the acceptor. Initiator also tracks it.
  31. gotFin missinggo.Event
  32. wroteFin missinggo.Event
  33. err error
  34. closed missinggo.Event
  35. destroyed missinggo.Event
  36. canWrite missinggo.Event
  37. unackedSends []*send
  38. // Inbound payloads, the first is ack_nr+1.
  39. inbound []recv
  40. inboundWnd int
  41. connDeadlines
  42. latencies []time.Duration
  43. // We need to send state packet.
  44. pendingSendState bool
  45. sendPendingSendSendStateTimer *time.Timer
  46. // Send state is being delayed until sendStateTimer fires, which may have
  47. // been set at the beginning of a batch of received packets.
  48. sendPendingSendStateTimerActive bool
  49. // This timer fires when no packet has been received for a period.
  50. packetReadTimeoutTimer *time.Timer
  51. }
  52. var (
  53. _ net.Conn = &Conn{}
  54. )
  55. func (c *Conn) age() time.Duration {
  56. return time.Since(c.created)
  57. }
  58. func (c *Conn) timestamp() uint32 {
  59. return nowTimestamp() - c.startTimestamp
  60. }
  61. func (c *Conn) sendPendingSendStateTimerCallback() {
  62. mu.Lock()
  63. defer mu.Unlock()
  64. c.sendPendingSendStateTimerActive = false
  65. c.sendPendingSendSendStateTimer.Stop()
  66. c.sendPendingState()
  67. }
  68. // Send a state packet, if one is needed.
  69. func (c *Conn) sendPendingState() {
  70. if c.destroyed.IsSet() {
  71. c.sendReset()
  72. } else {
  73. c.sendState()
  74. }
  75. }
  76. // So far as the spec makes clear, this is how many more, as-yet-unacked bytes
  77. // we can fit into our receive buffers.
  78. func (c *Conn) wndSize() uint32 {
  79. if len(c.readBuf)+c.inboundWnd > readBufferLen {
  80. return 0
  81. }
  82. return uint32(readBufferLen - len(c.readBuf) - c.inboundWnd)
  83. }
  84. func (c *Conn) makePacket(_type st, connID, seqNr uint16, payload []byte) (p []byte) {
  85. var selAck selectiveAckBitmask
  86. for i := 1; i < len(c.inbound); i++ {
  87. if c.inbound[i].seen {
  88. selAck.SetBit(i - 1)
  89. }
  90. }
  91. h := header{
  92. Type: _type,
  93. Version: 1,
  94. ConnID: connID,
  95. SeqNr: seqNr,
  96. AckNr: c.ack_nr,
  97. WndSize: c.wndSize(),
  98. Timestamp: c.timestamp(),
  99. TimestampDiff: c.lastTimeDiff,
  100. }
  101. if len(selAck.Bytes) != 0 {
  102. // The spec requires the number of bytes for a selective ACK to be at
  103. // least 4, and a multiple of 4.
  104. if len(selAck.Bytes)%4 != 0 {
  105. panic(len(selAck.Bytes))
  106. }
  107. h.Extensions = append(h.Extensions, extensionField{
  108. Type: extensionTypeSelectiveAck,
  109. Bytes: selAck.Bytes,
  110. })
  111. }
  112. p = sendBufferPool.Get().([]byte)[:0:minMTU]
  113. n := h.Marshal(p)
  114. p = p[:n]
  115. // Extension headers are currently fixed in size.
  116. if n > maxHeaderSize {
  117. panic("header has unexpected size")
  118. }
  119. p = append(p, payload...)
  120. return
  121. }
  122. // Send the given payload with an up to date header.
  123. func (c *Conn) send(_type st, connID uint16, payload []byte, seqNr uint16) (err error) {
  124. p := c.makePacket(_type, connID, seqNr, payload)
  125. n1, err := c.socket.writeTo(p, c.remoteSocketAddr)
  126. sendBufferPool.Put(p[:0:minMTU])
  127. if err != nil {
  128. return
  129. }
  130. if n1 != len(p) {
  131. panic(n1)
  132. }
  133. if c.unpendSendState() && _type != stState {
  134. // We needed to send a state packet, but this packet suppresses that
  135. // need.
  136. unsentStatePackets.Add(1)
  137. }
  138. return
  139. }
  140. func (c *Conn) unpendSendState() (wasPending bool) {
  141. wasPending = c.pendingSendState
  142. c.pendingSendState = false
  143. c.sendPendingSendSendStateTimer.Stop()
  144. c.sendPendingSendStateTimerActive = false
  145. return
  146. }
  147. func (c *Conn) pendSendState() {
  148. if c.pendingSendState {
  149. // A state packet is pending but hasn't been sent, and we want to send
  150. // another.
  151. unsentStatePackets.Add(1)
  152. }
  153. c.pendingSendState = true
  154. if !c.sendPendingSendStateTimerActive {
  155. c.sendPendingSendSendStateTimer.Reset(pendingSendStateDelay)
  156. c.sendPendingSendStateTimerActive = true
  157. }
  158. }
  159. func (me *Conn) writeSyn() {
  160. me.write(stSyn, me.recv_id, nil, me.seq_nr)
  161. return
  162. }
  163. func (c *Conn) write(_type st, connID uint16, payload []byte, seqNr uint16) (n int, err error) {
  164. switch _type {
  165. case stSyn, stFin, stData:
  166. default:
  167. panic(_type)
  168. }
  169. if c.wroteFin.IsSet() {
  170. panic("can't write after fin")
  171. }
  172. if len(payload) > maxPayloadSize {
  173. payload = payload[:maxPayloadSize]
  174. }
  175. err = c.send(_type, connID, payload, seqNr)
  176. if err != nil {
  177. c.destroy(fmt.Errorf("error sending packet: %s", err))
  178. return
  179. }
  180. n = len(payload)
  181. // Copy payload so caller to write can continue to use the buffer.
  182. if payload != nil {
  183. payload = append(sendBufferPool.Get().([]byte)[:0:minMTU], payload...)
  184. }
  185. send := &send{
  186. payloadSize: uint32(len(payload)),
  187. started: missinggo.MonotonicNow(),
  188. _type: _type,
  189. connID: connID,
  190. payload: payload,
  191. seqNr: seqNr,
  192. conn: c,
  193. }
  194. send.resendTimer = time.AfterFunc(c.resendTimeout(), send.timeoutResend)
  195. c.unackedSends = append(c.unackedSends, send)
  196. c.cur_window += send.payloadSize
  197. c.updateCanWrite()
  198. c.seq_nr++
  199. return
  200. }
  201. // TODO: Introduce a minimum latency.
  202. func (c *Conn) latency() (ret time.Duration) {
  203. if len(c.latencies) == 0 {
  204. return initialLatency
  205. }
  206. for _, l := range c.latencies {
  207. ret += l
  208. }
  209. ret = (ret + time.Duration(len(c.latencies)) - 1) / time.Duration(len(c.latencies))
  210. return
  211. }
  212. func (c *Conn) sendState() {
  213. c.send(stState, c.send_id, nil, c.seq_nr)
  214. sentStatePackets.Add(1)
  215. }
  216. func (c *Conn) sendReset() {
  217. c.send(stReset, c.send_id, nil, c.seq_nr)
  218. }
  219. func (c *Conn) addLatency(l time.Duration) {
  220. c.latencies = append(c.latencies, l)
  221. if len(c.latencies) > 10 {
  222. c.latencies = c.latencies[len(c.latencies)-10:]
  223. }
  224. }
  225. // Ack our send with the given sequence number.
  226. func (c *Conn) ack(nr uint16) {
  227. if !seqLess(c.lastAck, nr) {
  228. // Already acked.
  229. return
  230. }
  231. i := nr - c.lastAck - 1
  232. if int(i) >= len(c.unackedSends) {
  233. // Remote has acknowledged receipt of packets we haven't even sent.
  234. acksReceivedAheadOfSyn.Add(1)
  235. // log.Printf("got ack ahead of syn (%x > %x)", nr, c.seq_nr-1)
  236. return
  237. }
  238. s := c.unackedSends[i]
  239. latency, first := s.Ack()
  240. if first {
  241. c.cur_window -= s.payloadSize
  242. c.updateCanWrite()
  243. c.addLatency(latency)
  244. }
  245. // Trim sends that aren't needed anymore.
  246. for len(c.unackedSends) != 0 {
  247. if !c.unackedSends[0].acked.IsSet() {
  248. // Can't trim unacked sends any further.
  249. return
  250. }
  251. // Trim the front of the unacked sends.
  252. c.unackedSends = c.unackedSends[1:]
  253. c.updateCanWrite()
  254. c.lastAck++
  255. }
  256. }
  257. func (c *Conn) ackTo(nr uint16) {
  258. if !seqLess(nr, c.seq_nr) {
  259. return
  260. }
  261. for seqLess(c.lastAck, nr) {
  262. c.ack(c.lastAck + 1)
  263. }
  264. }
  265. // Return the send state for the sequence number. Returns nil if there's no
  266. // outstanding send for that sequence number.
  267. func (c *Conn) seqSend(seqNr uint16) *send {
  268. if !seqLess(c.lastAck, seqNr) {
  269. // Presumably already acked.
  270. return nil
  271. }
  272. i := int(seqNr - c.lastAck - 1)
  273. if i >= len(c.unackedSends) {
  274. // No such send.
  275. return nil
  276. }
  277. return c.unackedSends[i]
  278. }
  279. func (c *Conn) resendTimeout() time.Duration {
  280. l := c.latency()
  281. ret := missinggo.JitterDuration(3*l, l)
  282. return ret
  283. }
  284. func (c *Conn) ackSkipped(seqNr uint16) {
  285. send := c.seqSend(seqNr)
  286. if send == nil {
  287. return
  288. }
  289. send.acksSkipped++
  290. if send.acked.IsSet() {
  291. return
  292. }
  293. switch send.acksSkipped {
  294. case 3, 60:
  295. ackSkippedResends.Add(1)
  296. send.resend()
  297. send.resendTimer.Reset(c.resendTimeout() * time.Duration(send.numResends))
  298. default:
  299. }
  300. }
  301. // Handle a packet destined for this connection.
  302. func (c *Conn) receivePacket(h header, payload []byte) {
  303. c.packetReadTimeoutTimer.Reset(packetReadTimeout)
  304. c.processDelivery(h, payload)
  305. }
  306. func (c *Conn) receivePacketTimeoutCallback() {
  307. mu.Lock()
  308. c.destroy(errors.New("no packet read timeout"))
  309. mu.Unlock()
  310. }
  311. func (c *Conn) lazyDestroy() {
  312. if c.wroteFin.IsSet() && len(c.unackedSends) <= 1 && (c.gotFin.IsSet() || c.closed.IsSet()) {
  313. c.destroy(errors.New("lazily destroyed"))
  314. }
  315. }
  316. func (c *Conn) processDelivery(h header, payload []byte) {
  317. deliveriesProcessed.Add(1)
  318. defer c.lazyDestroy()
  319. c.assertHeader(h)
  320. c.peerWndSize = h.WndSize
  321. c.applyAcks(h)
  322. if h.Timestamp == 0 {
  323. c.lastTimeDiff = 0
  324. } else {
  325. c.lastTimeDiff = c.timestamp() - h.Timestamp
  326. }
  327. if h.Type == stReset {
  328. c.destroy(errors.New("peer reset"))
  329. return
  330. }
  331. if !c.synAcked {
  332. if h.Type != stState {
  333. return
  334. }
  335. c.synAcked = true
  336. c.updateCanWrite()
  337. c.ack_nr = h.SeqNr - 1
  338. return
  339. }
  340. if h.Type == stState {
  341. return
  342. }
  343. // Even if we didn't need or want this packet, we need to inform the peer
  344. // what our state is, in case they missed something.
  345. c.pendSendState()
  346. if !seqLess(c.ack_nr, h.SeqNr) {
  347. // Already received this packet.
  348. return
  349. }
  350. inboundIndex := int(h.SeqNr - c.ack_nr - 1)
  351. if inboundIndex < len(c.inbound) && c.inbound[inboundIndex].seen {
  352. // Already received this packet.
  353. return
  354. }
  355. // Derived from running in production:
  356. // grep -oP '(?<=packet out of order, index=)\d+' log | sort -n | uniq -c
  357. // 64 should correspond to 8 bytes of selective ack.
  358. if inboundIndex >= maxUnackedInbound {
  359. // Discard packet too far ahead.
  360. if logLevel >= 1 {
  361. log.Printf("received packet from %s %d ahead of next seqnr (%x > %x)", c.remoteSocketAddr, inboundIndex, h.SeqNr, c.ack_nr+1)
  362. }
  363. return
  364. }
  365. // Extend inbound so the new packet has a place.
  366. for inboundIndex >= len(c.inbound) {
  367. c.inbound = append(c.inbound, recv{})
  368. }
  369. c.inbound[inboundIndex] = recv{true, payload, h.Type}
  370. c.inboundWnd += len(payload)
  371. c.processInbound()
  372. }
  373. func (c *Conn) applyAcks(h header) {
  374. c.ackTo(h.AckNr)
  375. for _, ext := range h.Extensions {
  376. switch ext.Type {
  377. case extensionTypeSelectiveAck:
  378. c.ackSkipped(h.AckNr + 1)
  379. bitmask := selectiveAckBitmask{ext.Bytes}
  380. for i := 0; i < bitmask.NumBits(); i++ {
  381. if bitmask.BitIsSet(i) {
  382. nr := h.AckNr + 2 + uint16(i)
  383. // log.Printf("selectively acked %d", nr)
  384. c.ack(nr)
  385. } else {
  386. c.ackSkipped(h.AckNr + 2 + uint16(i))
  387. }
  388. }
  389. }
  390. }
  391. }
  392. func (c *Conn) assertHeader(h header) {
  393. if h.Type == stSyn {
  394. if h.ConnID != c.send_id {
  395. panic(fmt.Sprintf("%d != %d", h.ConnID, c.send_id))
  396. }
  397. } else {
  398. if h.ConnID != c.recv_id {
  399. panic("erroneous delivery")
  400. }
  401. }
  402. }
  403. func (c *Conn) updateReadBufNotEmpty() {
  404. c.readBufNotEmpty.SetBool(len(c.readBuf) != 0)
  405. }
  406. func (c *Conn) processInbound() {
  407. // Consume consecutive next packets.
  408. for !c.gotFin.IsSet() && len(c.inbound) > 0 && c.inbound[0].seen && len(c.readBuf) < readBufferLen {
  409. c.ack_nr++
  410. p := c.inbound[0]
  411. c.inbound = c.inbound[1:]
  412. c.inboundWnd -= len(p.data)
  413. c.readBuf = append(c.readBuf, p.data...)
  414. c.updateReadBufNotEmpty()
  415. if p.Type == stFin {
  416. c.gotFin.Set()
  417. }
  418. }
  419. }
  420. func (c *Conn) waitAck(seq uint16) {
  421. send := c.seqSend(seq)
  422. if send == nil {
  423. return
  424. }
  425. missinggo.WaitEvents(&mu, &send.acked, &c.destroyed)
  426. return
  427. }
  428. // Waits for sent SYN to be ACKed. Returns any errors.
  429. func (c *Conn) recvSynAck() (err error) {
  430. mu.Lock()
  431. defer mu.Unlock()
  432. c.waitAck(1)
  433. if c.err != nil {
  434. err = c.err
  435. }
  436. c.synAcked = true
  437. c.updateCanWrite()
  438. return err
  439. }
  440. func (c *Conn) writeFin() {
  441. if c.wroteFin.IsSet() {
  442. return
  443. }
  444. c.write(stFin, c.send_id, nil, c.seq_nr)
  445. c.wroteFin.Set()
  446. return
  447. }
  448. func (c *Conn) destroy(reason error) {
  449. c.destroyed.Set()
  450. if c.err == nil {
  451. c.err = reason
  452. }
  453. c.detach()
  454. }
  455. func (c *Conn) closeNow() (err error) {
  456. c.closed.Set()
  457. c.writeFin()
  458. c.destroy(errors.New("destroyed"))
  459. return
  460. }
  461. func (c *Conn) Close() (err error) {
  462. mu.Lock()
  463. defer mu.Unlock()
  464. c.closed.Set()
  465. c.writeFin()
  466. c.lazyDestroy()
  467. return
  468. }
  469. func (c *Conn) LocalAddr() net.Addr {
  470. return addr{c.socket.Addr()}
  471. }
  472. func (c *Conn) Read(b []byte) (n int, err error) {
  473. mu.Lock()
  474. defer mu.Unlock()
  475. for {
  476. n = copy(b, c.readBuf)
  477. c.readBuf = c.readBuf[n:]
  478. c.updateReadBufNotEmpty()
  479. if n != 0 {
  480. // Inbound packets are backed up when the read buffer is too big.
  481. c.processInbound()
  482. return
  483. }
  484. if c.gotFin.IsSet() || c.closed.IsSet() {
  485. err = io.EOF
  486. return
  487. }
  488. if c.destroyed.IsSet() {
  489. if c.err == nil {
  490. panic("closed without receiving fin, and no error")
  491. }
  492. err = c.err
  493. return
  494. }
  495. if c.connDeadlines.read.passed.IsSet() {
  496. err = errTimeout
  497. return
  498. }
  499. missinggo.WaitEvents(&mu,
  500. &c.gotFin,
  501. &c.closed,
  502. &c.destroyed,
  503. &c.connDeadlines.read.passed,
  504. &c.readBufNotEmpty)
  505. }
  506. }
  507. func (c *Conn) RemoteAddr() net.Addr {
  508. return addr{c.remoteSocketAddr}
  509. }
  510. func (c *Conn) String() string {
  511. return fmt.Sprintf("<UTPConn %s-%s (%d)>", c.LocalAddr(), c.RemoteAddr(), c.recv_id)
  512. }
  513. func (c *Conn) updateCanWrite() {
  514. c.canWrite.SetBool(c.synAcked &&
  515. len(c.unackedSends) < maxUnackedSends &&
  516. c.cur_window <= c.peerWndSize)
  517. }
  518. func (c *Conn) Write(p []byte) (n int, err error) {
  519. mu.Lock()
  520. defer mu.Unlock()
  521. for len(p) != 0 {
  522. if c.wroteFin.IsSet() || c.closed.IsSet() {
  523. err = errClosed
  524. return
  525. }
  526. if c.destroyed.IsSet() {
  527. err = c.err
  528. return
  529. }
  530. if c.connDeadlines.write.passed.IsSet() {
  531. err = errTimeout
  532. return
  533. }
  534. // If peerWndSize is 0, we still want to send something, so don't
  535. // block until we exceed it.
  536. if c.canWrite.IsSet() {
  537. var n1 int
  538. n1, err = c.write(stData, c.send_id, p, c.seq_nr)
  539. n += n1
  540. if err != nil {
  541. break
  542. }
  543. if n1 == 0 {
  544. panic(len(p))
  545. }
  546. p = p[n1:]
  547. continue
  548. }
  549. missinggo.WaitEvents(&mu,
  550. &c.wroteFin,
  551. &c.closed,
  552. &c.destroyed,
  553. &c.connDeadlines.write.passed,
  554. &c.canWrite)
  555. }
  556. return
  557. }
  558. func (c *Conn) detach() {
  559. s := c.socket
  560. _, ok := s.conns[c.connKey]
  561. if !ok {
  562. return
  563. }
  564. delete(s.conns, c.connKey)
  565. s.lazyDestroy()
  566. }