response.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package appsrv
  15. import (
  16. "bufio"
  17. "context"
  18. "fmt"
  19. "net"
  20. "net/http"
  21. "yunion.io/x/onecloud/pkg/httperrors"
  22. )
  23. type responseWriterResponse struct {
  24. count int
  25. err error
  26. }
  27. type responseWriterChannel struct {
  28. backend http.ResponseWriter
  29. bodyChan chan []byte
  30. bodyResp chan responseWriterResponse
  31. statusChan chan int
  32. statusResp chan bool
  33. isClosed bool
  34. }
  35. func newResponseWriterChannel(backend http.ResponseWriter) responseWriterChannel {
  36. return responseWriterChannel{
  37. backend: backend,
  38. bodyChan: make(chan []byte),
  39. bodyResp: make(chan responseWriterResponse),
  40. statusChan: make(chan int),
  41. statusResp: make(chan bool),
  42. isClosed: false,
  43. }
  44. }
  45. func (w *responseWriterChannel) Header() http.Header {
  46. if w.isClosed {
  47. // return a dumb header
  48. return http.Header{}
  49. }
  50. return w.backend.Header()
  51. }
  52. func (w *responseWriterChannel) Write(bytes []byte) (int, error) {
  53. if w.isClosed {
  54. return 0, fmt.Errorf("response stream has been closed")
  55. }
  56. w.bodyChan <- bytes
  57. v := <-w.bodyResp
  58. return v.count, v.err
  59. }
  60. func (w *responseWriterChannel) WriteHeader(status int) {
  61. if w.isClosed {
  62. return
  63. }
  64. w.statusChan <- status
  65. <-w.statusResp
  66. }
  67. // implent http.Flusher
  68. func (w *responseWriterChannel) Flush() {
  69. if w.isClosed {
  70. return
  71. }
  72. if f, ok := w.backend.(http.Flusher); ok {
  73. f.Flush()
  74. }
  75. }
  76. // Hijack implements the Hijacker.Hijack method. Our response is both a ResponseWriter
  77. // and a Hijacker.
  78. func (w *responseWriterChannel) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) {
  79. if w.isClosed {
  80. return nil, nil, fmt.Errorf("response stream has been closed")
  81. }
  82. if f, ok := w.backend.(http.Hijacker); ok {
  83. return f.Hijack()
  84. }
  85. return nil, nil, fmt.Errorf("not a hijacker")
  86. }
  87. func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWorker) interface{} {
  88. var err error
  89. var worker *SWorker
  90. stop := false
  91. for !stop {
  92. select {
  93. case curWorker, more := <-workerChan:
  94. if more {
  95. worker = curWorker
  96. } else {
  97. // ignore, worker is responsible for close the channel
  98. }
  99. case <-ctx.Done():
  100. // ctx deadline reached, timeout
  101. if worker != nil {
  102. worker.Detach("timeout")
  103. }
  104. err = httperrors.NewTimeoutError("request process timeout")
  105. stop = true
  106. case bytes, more := <-w.bodyChan:
  107. // log.Infof("Recive body: %s, more: %v", len(bytes), more)
  108. if more {
  109. c, e := w.backend.Write(bytes)
  110. w.bodyResp <- responseWriterResponse{count: c, err: e}
  111. } else {
  112. stop = true
  113. }
  114. case status, more := <-w.statusChan:
  115. // log.Infof("Recive status %d, more: %v", status, more)
  116. if more {
  117. w.backend.WriteHeader(status)
  118. w.statusResp <- true
  119. } else {
  120. stop = true
  121. }
  122. }
  123. }
  124. return err
  125. }
  126. func (w *responseWriterChannel) closeChannels() {
  127. if w.isClosed {
  128. return
  129. }
  130. w.isClosed = true
  131. close(w.bodyChan)
  132. close(w.bodyResp)
  133. close(w.statusChan)
  134. close(w.statusResp)
  135. }