controlbuf.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "runtime"
  25. "strconv"
  26. "sync"
  27. "sync/atomic"
  28. "golang.org/x/net/http2"
  29. "golang.org/x/net/http2/hpack"
  30. "google.golang.org/grpc/internal/grpclog"
  31. "google.golang.org/grpc/internal/grpcutil"
  32. "google.golang.org/grpc/status"
  33. )
  34. var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  35. e.SetMaxDynamicTableSizeLimit(v)
  36. }
  37. type itemNode struct {
  38. it any
  39. next *itemNode
  40. }
  41. type itemList struct {
  42. head *itemNode
  43. tail *itemNode
  44. }
  45. func (il *itemList) enqueue(i any) {
  46. n := &itemNode{it: i}
  47. if il.tail == nil {
  48. il.head, il.tail = n, n
  49. return
  50. }
  51. il.tail.next = n
  52. il.tail = n
  53. }
  54. // peek returns the first item in the list without removing it from the
  55. // list.
  56. func (il *itemList) peek() any {
  57. return il.head.it
  58. }
  59. func (il *itemList) dequeue() any {
  60. if il.head == nil {
  61. return nil
  62. }
  63. i := il.head.it
  64. il.head = il.head.next
  65. if il.head == nil {
  66. il.tail = nil
  67. }
  68. return i
  69. }
  70. func (il *itemList) dequeueAll() *itemNode {
  71. h := il.head
  72. il.head, il.tail = nil, nil
  73. return h
  74. }
  75. func (il *itemList) isEmpty() bool {
  76. return il.head == nil
  77. }
  78. // The following defines various control items which could flow through
  79. // the control buffer of transport. They represent different aspects of
  80. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  81. // maxQueuedTransportResponseFrames is the most queued "transport response"
  82. // frames we will buffer before preventing new reads from occurring on the
  83. // transport. These are control frames sent in response to client requests,
  84. // such as RST_STREAM due to bad headers or settings acks.
  85. const maxQueuedTransportResponseFrames = 50
  86. type cbItem interface {
  87. isTransportResponseFrame() bool
  88. }
  89. // registerStream is used to register an incoming stream with loopy writer.
  90. type registerStream struct {
  91. streamID uint32
  92. wq *writeQuota
  93. }
  94. func (*registerStream) isTransportResponseFrame() bool { return false }
  95. // headerFrame is also used to register stream on the client-side.
  96. type headerFrame struct {
  97. streamID uint32
  98. hf []hpack.HeaderField
  99. endStream bool // Valid on server side.
  100. initStream func(uint32) error // Used only on the client side.
  101. onWrite func()
  102. wq *writeQuota // write quota for the stream created.
  103. cleanup *cleanupStream // Valid on the server side.
  104. onOrphaned func(error) // Valid on client-side
  105. }
  106. func (h *headerFrame) isTransportResponseFrame() bool {
  107. return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
  108. }
  109. type cleanupStream struct {
  110. streamID uint32
  111. rst bool
  112. rstCode http2.ErrCode
  113. onWrite func()
  114. }
  115. func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
  116. type earlyAbortStream struct {
  117. httpStatus uint32
  118. streamID uint32
  119. contentSubtype string
  120. status *status.Status
  121. rst bool
  122. }
  123. func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
  124. type dataFrame struct {
  125. streamID uint32
  126. endStream bool
  127. h []byte
  128. d []byte
  129. // onEachWrite is called every time
  130. // a part of d is written out.
  131. onEachWrite func()
  132. }
  133. func (*dataFrame) isTransportResponseFrame() bool { return false }
  134. type incomingWindowUpdate struct {
  135. streamID uint32
  136. increment uint32
  137. }
  138. func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
  139. type outgoingWindowUpdate struct {
  140. streamID uint32
  141. increment uint32
  142. }
  143. func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
  144. return false // window updates are throttled by thresholds
  145. }
  146. type incomingSettings struct {
  147. ss []http2.Setting
  148. }
  149. func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
  150. type outgoingSettings struct {
  151. ss []http2.Setting
  152. }
  153. func (*outgoingSettings) isTransportResponseFrame() bool { return false }
  154. type incomingGoAway struct {
  155. }
  156. func (*incomingGoAway) isTransportResponseFrame() bool { return false }
  157. type goAway struct {
  158. code http2.ErrCode
  159. debugData []byte
  160. headsUp bool
  161. closeConn error // if set, loopyWriter will exit, resulting in conn closure
  162. }
  163. func (*goAway) isTransportResponseFrame() bool { return false }
  164. type ping struct {
  165. ack bool
  166. data [8]byte
  167. }
  168. func (*ping) isTransportResponseFrame() bool { return true }
  169. type outFlowControlSizeRequest struct {
  170. resp chan uint32
  171. }
  172. func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
  173. // closeConnection is an instruction to tell the loopy writer to flush the
  174. // framer and exit, which will cause the transport's connection to be closed
  175. // (by the client or server). The transport itself will close after the reader
  176. // encounters the EOF caused by the connection closure.
  177. type closeConnection struct{}
  178. func (closeConnection) isTransportResponseFrame() bool { return false }
  179. type outStreamState int
  180. const (
  181. active outStreamState = iota
  182. empty
  183. waitingOnStreamQuota
  184. )
  185. type outStream struct {
  186. id uint32
  187. state outStreamState
  188. itl *itemList
  189. bytesOutStanding int
  190. wq *writeQuota
  191. next *outStream
  192. prev *outStream
  193. }
  194. func (s *outStream) deleteSelf() {
  195. if s.prev != nil {
  196. s.prev.next = s.next
  197. }
  198. if s.next != nil {
  199. s.next.prev = s.prev
  200. }
  201. s.next, s.prev = nil, nil
  202. }
  203. type outStreamList struct {
  204. // Following are sentinel objects that mark the
  205. // beginning and end of the list. They do not
  206. // contain any item lists. All valid objects are
  207. // inserted in between them.
  208. // This is needed so that an outStream object can
  209. // deleteSelf() in O(1) time without knowing which
  210. // list it belongs to.
  211. head *outStream
  212. tail *outStream
  213. }
  214. func newOutStreamList() *outStreamList {
  215. head, tail := new(outStream), new(outStream)
  216. head.next = tail
  217. tail.prev = head
  218. return &outStreamList{
  219. head: head,
  220. tail: tail,
  221. }
  222. }
  223. func (l *outStreamList) enqueue(s *outStream) {
  224. e := l.tail.prev
  225. e.next = s
  226. s.prev = e
  227. s.next = l.tail
  228. l.tail.prev = s
  229. }
  230. // remove from the beginning of the list.
  231. func (l *outStreamList) dequeue() *outStream {
  232. b := l.head.next
  233. if b == l.tail {
  234. return nil
  235. }
  236. b.deleteSelf()
  237. return b
  238. }
  239. // controlBuffer is a way to pass information to loopy.
  240. // Information is passed as specific struct types called control frames.
  241. // A control frame not only represents data, messages or headers to be sent out
  242. // but can also be used to instruct loopy to update its internal state.
  243. // It shouldn't be confused with an HTTP2 frame, although some of the control frames
  244. // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
  245. type controlBuffer struct {
  246. ch chan struct{}
  247. done <-chan struct{}
  248. mu sync.Mutex
  249. consumerWaiting bool
  250. list *itemList
  251. err error
  252. // transportResponseFrames counts the number of queued items that represent
  253. // the response of an action initiated by the peer. trfChan is created
  254. // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
  255. // closed and nilled when transportResponseFrames drops below the
  256. // threshold. Both fields are protected by mu.
  257. transportResponseFrames int
  258. trfChan atomic.Value // chan struct{}
  259. }
  260. func newControlBuffer(done <-chan struct{}) *controlBuffer {
  261. return &controlBuffer{
  262. ch: make(chan struct{}, 1),
  263. list: &itemList{},
  264. done: done,
  265. }
  266. }
  267. // throttle blocks if there are too many incomingSettings/cleanupStreams in the
  268. // controlbuf.
  269. func (c *controlBuffer) throttle() {
  270. ch, _ := c.trfChan.Load().(chan struct{})
  271. if ch != nil {
  272. select {
  273. case <-ch:
  274. case <-c.done:
  275. }
  276. }
  277. }
  278. func (c *controlBuffer) put(it cbItem) error {
  279. _, err := c.executeAndPut(nil, it)
  280. return err
  281. }
  282. func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
  283. var wakeUp bool
  284. c.mu.Lock()
  285. if c.err != nil {
  286. c.mu.Unlock()
  287. return false, c.err
  288. }
  289. if f != nil {
  290. if !f(it) { // f wasn't successful
  291. c.mu.Unlock()
  292. return false, nil
  293. }
  294. }
  295. if c.consumerWaiting {
  296. wakeUp = true
  297. c.consumerWaiting = false
  298. }
  299. c.list.enqueue(it)
  300. if it.isTransportResponseFrame() {
  301. c.transportResponseFrames++
  302. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  303. // We are adding the frame that puts us over the threshold; create
  304. // a throttling channel.
  305. c.trfChan.Store(make(chan struct{}))
  306. }
  307. }
  308. c.mu.Unlock()
  309. if wakeUp {
  310. select {
  311. case c.ch <- struct{}{}:
  312. default:
  313. }
  314. }
  315. return true, nil
  316. }
  317. // Note argument f should never be nil.
  318. func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
  319. c.mu.Lock()
  320. if c.err != nil {
  321. c.mu.Unlock()
  322. return false, c.err
  323. }
  324. if !f(it) { // f wasn't successful
  325. c.mu.Unlock()
  326. return false, nil
  327. }
  328. c.mu.Unlock()
  329. return true, nil
  330. }
  331. func (c *controlBuffer) get(block bool) (any, error) {
  332. for {
  333. c.mu.Lock()
  334. if c.err != nil {
  335. c.mu.Unlock()
  336. return nil, c.err
  337. }
  338. if !c.list.isEmpty() {
  339. h := c.list.dequeue().(cbItem)
  340. if h.isTransportResponseFrame() {
  341. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  342. // We are removing the frame that put us over the
  343. // threshold; close and clear the throttling channel.
  344. ch := c.trfChan.Load().(chan struct{})
  345. close(ch)
  346. c.trfChan.Store((chan struct{})(nil))
  347. }
  348. c.transportResponseFrames--
  349. }
  350. c.mu.Unlock()
  351. return h, nil
  352. }
  353. if !block {
  354. c.mu.Unlock()
  355. return nil, nil
  356. }
  357. c.consumerWaiting = true
  358. c.mu.Unlock()
  359. select {
  360. case <-c.ch:
  361. case <-c.done:
  362. return nil, errors.New("transport closed by client")
  363. }
  364. }
  365. }
  366. func (c *controlBuffer) finish() {
  367. c.mu.Lock()
  368. if c.err != nil {
  369. c.mu.Unlock()
  370. return
  371. }
  372. c.err = ErrConnClosing
  373. // There may be headers for streams in the control buffer.
  374. // These streams need to be cleaned out since the transport
  375. // is still not aware of these yet.
  376. for head := c.list.dequeueAll(); head != nil; head = head.next {
  377. hdr, ok := head.it.(*headerFrame)
  378. if !ok {
  379. continue
  380. }
  381. if hdr.onOrphaned != nil { // It will be nil on the server-side.
  382. hdr.onOrphaned(ErrConnClosing)
  383. }
  384. }
  385. // In case throttle() is currently in flight, it needs to be unblocked.
  386. // Otherwise, the transport may not close, since the transport is closed by
  387. // the reader encountering the connection error.
  388. ch, _ := c.trfChan.Load().(chan struct{})
  389. if ch != nil {
  390. close(ch)
  391. }
  392. c.trfChan.Store((chan struct{})(nil))
  393. c.mu.Unlock()
  394. }
  395. type side int
  396. const (
  397. clientSide side = iota
  398. serverSide
  399. )
  400. // Loopy receives frames from the control buffer.
  401. // Each frame is handled individually; most of the work done by loopy goes
  402. // into handling data frames. Loopy maintains a queue of active streams, and each
  403. // stream maintains a queue of data frames; as loopy receives data frames
  404. // it gets added to the queue of the relevant stream.
  405. // Loopy goes over this list of active streams by processing one node every iteration,
  406. // thereby closely resemebling to a round-robin scheduling over all streams. While
  407. // processing a stream, loopy writes out data bytes from this stream capped by the min
  408. // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
  409. type loopyWriter struct {
  410. side side
  411. cbuf *controlBuffer
  412. sendQuota uint32
  413. oiws uint32 // outbound initial window size.
  414. // estdStreams is map of all established streams that are not cleaned-up yet.
  415. // On client-side, this is all streams whose headers were sent out.
  416. // On server-side, this is all streams whose headers were received.
  417. estdStreams map[uint32]*outStream // Established streams.
  418. // activeStreams is a linked-list of all streams that have data to send and some
  419. // stream-level flow control quota.
  420. // Each of these streams internally have a list of data items(and perhaps trailers
  421. // on the server-side) to be sent out.
  422. activeStreams *outStreamList
  423. framer *framer
  424. hBuf *bytes.Buffer // The buffer for HPACK encoding.
  425. hEnc *hpack.Encoder // HPACK encoder.
  426. bdpEst *bdpEstimator
  427. draining bool
  428. conn net.Conn
  429. logger *grpclog.PrefixLogger
  430. // Side-specific handlers
  431. ssGoAwayHandler func(*goAway) (bool, error)
  432. }
  433. func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
  434. var buf bytes.Buffer
  435. l := &loopyWriter{
  436. side: s,
  437. cbuf: cbuf,
  438. sendQuota: defaultWindowSize,
  439. oiws: defaultWindowSize,
  440. estdStreams: make(map[uint32]*outStream),
  441. activeStreams: newOutStreamList(),
  442. framer: fr,
  443. hBuf: &buf,
  444. hEnc: hpack.NewEncoder(&buf),
  445. bdpEst: bdpEst,
  446. conn: conn,
  447. logger: logger,
  448. }
  449. return l
  450. }
  451. const minBatchSize = 1000
  452. // run should be run in a separate goroutine.
  453. // It reads control frames from controlBuf and processes them by:
  454. // 1. Updating loopy's internal state, or/and
  455. // 2. Writing out HTTP2 frames on the wire.
  456. //
  457. // Loopy keeps all active streams with data to send in a linked-list.
  458. // All streams in the activeStreams linked-list must have both:
  459. // 1. Data to send, and
  460. // 2. Stream level flow control quota available.
  461. //
  462. // In each iteration of run loop, other than processing the incoming control
  463. // frame, loopy calls processData, which processes one node from the
  464. // activeStreams linked-list. This results in writing of HTTP2 frames into an
  465. // underlying write buffer. When there's no more control frames to read from
  466. // controlBuf, loopy flushes the write buffer. As an optimization, to increase
  467. // the batch size for each flush, loopy yields the processor, once if the batch
  468. // size is too low to give stream goroutines a chance to fill it up.
  469. //
  470. // Upon exiting, if the error causing the exit is not an I/O error, run()
  471. // flushes the underlying connection. The connection is always left open to
  472. // allow different closing behavior on the client and server.
  473. func (l *loopyWriter) run() (err error) {
  474. defer func() {
  475. if l.logger.V(logLevel) {
  476. l.logger.Infof("loopyWriter exiting with error: %v", err)
  477. }
  478. if !isIOError(err) {
  479. l.framer.writer.Flush()
  480. }
  481. l.cbuf.finish()
  482. }()
  483. for {
  484. it, err := l.cbuf.get(true)
  485. if err != nil {
  486. return err
  487. }
  488. if err = l.handle(it); err != nil {
  489. return err
  490. }
  491. if _, err = l.processData(); err != nil {
  492. return err
  493. }
  494. gosched := true
  495. hasdata:
  496. for {
  497. it, err := l.cbuf.get(false)
  498. if err != nil {
  499. return err
  500. }
  501. if it != nil {
  502. if err = l.handle(it); err != nil {
  503. return err
  504. }
  505. if _, err = l.processData(); err != nil {
  506. return err
  507. }
  508. continue hasdata
  509. }
  510. isEmpty, err := l.processData()
  511. if err != nil {
  512. return err
  513. }
  514. if !isEmpty {
  515. continue hasdata
  516. }
  517. if gosched {
  518. gosched = false
  519. if l.framer.writer.offset < minBatchSize {
  520. runtime.Gosched()
  521. continue hasdata
  522. }
  523. }
  524. l.framer.writer.Flush()
  525. break hasdata
  526. }
  527. }
  528. }
  529. func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
  530. return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
  531. }
  532. func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
  533. // Otherwise update the quota.
  534. if w.streamID == 0 {
  535. l.sendQuota += w.increment
  536. return
  537. }
  538. // Find the stream and update it.
  539. if str, ok := l.estdStreams[w.streamID]; ok {
  540. str.bytesOutStanding -= int(w.increment)
  541. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
  542. str.state = active
  543. l.activeStreams.enqueue(str)
  544. return
  545. }
  546. }
  547. }
  548. func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
  549. return l.framer.fr.WriteSettings(s.ss...)
  550. }
  551. func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
  552. l.applySettings(s.ss)
  553. return l.framer.fr.WriteSettingsAck()
  554. }
  555. func (l *loopyWriter) registerStreamHandler(h *registerStream) {
  556. str := &outStream{
  557. id: h.streamID,
  558. state: empty,
  559. itl: &itemList{},
  560. wq: h.wq,
  561. }
  562. l.estdStreams[h.streamID] = str
  563. }
  564. func (l *loopyWriter) headerHandler(h *headerFrame) error {
  565. if l.side == serverSide {
  566. str, ok := l.estdStreams[h.streamID]
  567. if !ok {
  568. if l.logger.V(logLevel) {
  569. l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
  570. }
  571. return nil
  572. }
  573. // Case 1.A: Server is responding back with headers.
  574. if !h.endStream {
  575. return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
  576. }
  577. // else: Case 1.B: Server wants to close stream.
  578. if str.state != empty { // either active or waiting on stream quota.
  579. // add it str's list of items.
  580. str.itl.enqueue(h)
  581. return nil
  582. }
  583. if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
  584. return err
  585. }
  586. return l.cleanupStreamHandler(h.cleanup)
  587. }
  588. // Case 2: Client wants to originate stream.
  589. str := &outStream{
  590. id: h.streamID,
  591. state: empty,
  592. itl: &itemList{},
  593. wq: h.wq,
  594. }
  595. return l.originateStream(str, h)
  596. }
  597. func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
  598. // l.draining is set when handling GoAway. In which case, we want to avoid
  599. // creating new streams.
  600. if l.draining {
  601. // TODO: provide a better error with the reason we are in draining.
  602. hdr.onOrphaned(errStreamDrain)
  603. return nil
  604. }
  605. if err := hdr.initStream(str.id); err != nil {
  606. return err
  607. }
  608. if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
  609. return err
  610. }
  611. l.estdStreams[str.id] = str
  612. return nil
  613. }
  614. func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
  615. if onWrite != nil {
  616. onWrite()
  617. }
  618. l.hBuf.Reset()
  619. for _, f := range hf {
  620. if err := l.hEnc.WriteField(f); err != nil {
  621. if l.logger.V(logLevel) {
  622. l.logger.Warningf("Encountered error while encoding headers: %v", err)
  623. }
  624. }
  625. }
  626. var (
  627. err error
  628. endHeaders, first bool
  629. )
  630. first = true
  631. for !endHeaders {
  632. size := l.hBuf.Len()
  633. if size > http2MaxFrameLen {
  634. size = http2MaxFrameLen
  635. } else {
  636. endHeaders = true
  637. }
  638. if first {
  639. first = false
  640. err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  641. StreamID: streamID,
  642. BlockFragment: l.hBuf.Next(size),
  643. EndStream: endStream,
  644. EndHeaders: endHeaders,
  645. })
  646. } else {
  647. err = l.framer.fr.WriteContinuation(
  648. streamID,
  649. endHeaders,
  650. l.hBuf.Next(size),
  651. )
  652. }
  653. if err != nil {
  654. return err
  655. }
  656. }
  657. return nil
  658. }
  659. func (l *loopyWriter) preprocessData(df *dataFrame) {
  660. str, ok := l.estdStreams[df.streamID]
  661. if !ok {
  662. return
  663. }
  664. // If we got data for a stream it means that
  665. // stream was originated and the headers were sent out.
  666. str.itl.enqueue(df)
  667. if str.state == empty {
  668. str.state = active
  669. l.activeStreams.enqueue(str)
  670. }
  671. }
  672. func (l *loopyWriter) pingHandler(p *ping) error {
  673. if !p.ack {
  674. l.bdpEst.timesnap(p.data)
  675. }
  676. return l.framer.fr.WritePing(p.ack, p.data)
  677. }
  678. func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
  679. o.resp <- l.sendQuota
  680. }
  681. func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
  682. c.onWrite()
  683. if str, ok := l.estdStreams[c.streamID]; ok {
  684. // On the server side it could be a trailers-only response or
  685. // a RST_STREAM before stream initialization thus the stream might
  686. // not be established yet.
  687. delete(l.estdStreams, c.streamID)
  688. str.deleteSelf()
  689. }
  690. if c.rst { // If RST_STREAM needs to be sent.
  691. if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
  692. return err
  693. }
  694. }
  695. if l.draining && len(l.estdStreams) == 0 {
  696. // Flush and close the connection; we are done with it.
  697. return errors.New("finished processing active streams while in draining mode")
  698. }
  699. return nil
  700. }
  701. func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
  702. if l.side == clientSide {
  703. return errors.New("earlyAbortStream not handled on client")
  704. }
  705. // In case the caller forgets to set the http status, default to 200.
  706. if eas.httpStatus == 0 {
  707. eas.httpStatus = 200
  708. }
  709. headerFields := []hpack.HeaderField{
  710. {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
  711. {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
  712. {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
  713. {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
  714. }
  715. if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
  716. return err
  717. }
  718. if eas.rst {
  719. if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
  720. return err
  721. }
  722. }
  723. return nil
  724. }
  725. func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
  726. if l.side == clientSide {
  727. l.draining = true
  728. if len(l.estdStreams) == 0 {
  729. // Flush and close the connection; we are done with it.
  730. return errors.New("received GOAWAY with no active streams")
  731. }
  732. }
  733. return nil
  734. }
  735. func (l *loopyWriter) goAwayHandler(g *goAway) error {
  736. // Handling of outgoing GoAway is very specific to side.
  737. if l.ssGoAwayHandler != nil {
  738. draining, err := l.ssGoAwayHandler(g)
  739. if err != nil {
  740. return err
  741. }
  742. l.draining = draining
  743. }
  744. return nil
  745. }
  746. func (l *loopyWriter) handle(i any) error {
  747. switch i := i.(type) {
  748. case *incomingWindowUpdate:
  749. l.incomingWindowUpdateHandler(i)
  750. case *outgoingWindowUpdate:
  751. return l.outgoingWindowUpdateHandler(i)
  752. case *incomingSettings:
  753. return l.incomingSettingsHandler(i)
  754. case *outgoingSettings:
  755. return l.outgoingSettingsHandler(i)
  756. case *headerFrame:
  757. return l.headerHandler(i)
  758. case *registerStream:
  759. l.registerStreamHandler(i)
  760. case *cleanupStream:
  761. return l.cleanupStreamHandler(i)
  762. case *earlyAbortStream:
  763. return l.earlyAbortStreamHandler(i)
  764. case *incomingGoAway:
  765. return l.incomingGoAwayHandler(i)
  766. case *dataFrame:
  767. l.preprocessData(i)
  768. case *ping:
  769. return l.pingHandler(i)
  770. case *goAway:
  771. return l.goAwayHandler(i)
  772. case *outFlowControlSizeRequest:
  773. l.outFlowControlSizeRequestHandler(i)
  774. case closeConnection:
  775. // Just return a non-I/O error and run() will flush and close the
  776. // connection.
  777. return ErrConnClosing
  778. default:
  779. return fmt.Errorf("transport: unknown control message type %T", i)
  780. }
  781. return nil
  782. }
  783. func (l *loopyWriter) applySettings(ss []http2.Setting) {
  784. for _, s := range ss {
  785. switch s.ID {
  786. case http2.SettingInitialWindowSize:
  787. o := l.oiws
  788. l.oiws = s.Val
  789. if o < l.oiws {
  790. // If the new limit is greater make all depleted streams active.
  791. for _, stream := range l.estdStreams {
  792. if stream.state == waitingOnStreamQuota {
  793. stream.state = active
  794. l.activeStreams.enqueue(stream)
  795. }
  796. }
  797. }
  798. case http2.SettingHeaderTableSize:
  799. updateHeaderTblSize(l.hEnc, s.Val)
  800. }
  801. }
  802. }
  803. // processData removes the first stream from active streams, writes out at most 16KB
  804. // of its data and then puts it at the end of activeStreams if there's still more data
  805. // to be sent and stream has some stream-level flow control.
  806. func (l *loopyWriter) processData() (bool, error) {
  807. if l.sendQuota == 0 {
  808. return true, nil
  809. }
  810. str := l.activeStreams.dequeue() // Remove the first stream.
  811. if str == nil {
  812. return true, nil
  813. }
  814. dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
  815. // A data item is represented by a dataFrame, since it later translates into
  816. // multiple HTTP2 data frames.
  817. // Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
  818. // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
  819. // maximum possible HTTP2 frame size.
  820. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
  821. // Client sends out empty data frame with endStream = true
  822. if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
  823. return false, err
  824. }
  825. str.itl.dequeue() // remove the empty data item from stream
  826. if str.itl.isEmpty() {
  827. str.state = empty
  828. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
  829. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  830. return false, err
  831. }
  832. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  833. return false, err
  834. }
  835. } else {
  836. l.activeStreams.enqueue(str)
  837. }
  838. return false, nil
  839. }
  840. var (
  841. buf []byte
  842. )
  843. // Figure out the maximum size we can send
  844. maxSize := http2MaxFrameLen
  845. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
  846. str.state = waitingOnStreamQuota
  847. return false, nil
  848. } else if maxSize > strQuota {
  849. maxSize = strQuota
  850. }
  851. if maxSize > int(l.sendQuota) { // connection-level flow control.
  852. maxSize = int(l.sendQuota)
  853. }
  854. // Compute how much of the header and data we can send within quota and max frame length
  855. hSize := min(maxSize, len(dataItem.h))
  856. dSize := min(maxSize-hSize, len(dataItem.d))
  857. if hSize != 0 {
  858. if dSize == 0 {
  859. buf = dataItem.h
  860. } else {
  861. // We can add some data to grpc message header to distribute bytes more equally across frames.
  862. // Copy on the stack to avoid generating garbage
  863. var localBuf [http2MaxFrameLen]byte
  864. copy(localBuf[:hSize], dataItem.h)
  865. copy(localBuf[hSize:], dataItem.d[:dSize])
  866. buf = localBuf[:hSize+dSize]
  867. }
  868. } else {
  869. buf = dataItem.d
  870. }
  871. size := hSize + dSize
  872. // Now that outgoing flow controls are checked we can replenish str's write quota
  873. str.wq.replenish(size)
  874. var endStream bool
  875. // If this is the last data message on this stream and all of it can be written in this iteration.
  876. if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
  877. endStream = true
  878. }
  879. if dataItem.onEachWrite != nil {
  880. dataItem.onEachWrite()
  881. }
  882. if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
  883. return false, err
  884. }
  885. str.bytesOutStanding += size
  886. l.sendQuota -= uint32(size)
  887. dataItem.h = dataItem.h[hSize:]
  888. dataItem.d = dataItem.d[dSize:]
  889. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
  890. str.itl.dequeue()
  891. }
  892. if str.itl.isEmpty() {
  893. str.state = empty
  894. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
  895. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  896. return false, err
  897. }
  898. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  899. return false, err
  900. }
  901. } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
  902. str.state = waitingOnStreamQuota
  903. } else { // Otherwise add it back to the list of active streams.
  904. l.activeStreams.enqueue(str)
  905. }
  906. return false, nil
  907. }
  908. func min(a, b int) int {
  909. if a < b {
  910. return a
  911. }
  912. return b
  913. }