| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package appsrv
- import (
- "bufio"
- "context"
- "fmt"
- "net"
- "net/http"
- "yunion.io/x/onecloud/pkg/httperrors"
- )
- type responseWriterResponse struct {
- count int
- err error
- }
- type responseWriterChannel struct {
- backend http.ResponseWriter
- bodyChan chan []byte
- bodyResp chan responseWriterResponse
- statusChan chan int
- statusResp chan bool
- isClosed bool
- }
- func newResponseWriterChannel(backend http.ResponseWriter) responseWriterChannel {
- return responseWriterChannel{
- backend: backend,
- bodyChan: make(chan []byte),
- bodyResp: make(chan responseWriterResponse),
- statusChan: make(chan int),
- statusResp: make(chan bool),
- isClosed: false,
- }
- }
- func (w *responseWriterChannel) Header() http.Header {
- if w.isClosed {
- // return a dumb header
- return http.Header{}
- }
- return w.backend.Header()
- }
- func (w *responseWriterChannel) Write(bytes []byte) (int, error) {
- if w.isClosed {
- return 0, fmt.Errorf("response stream has been closed")
- }
- w.bodyChan <- bytes
- v := <-w.bodyResp
- return v.count, v.err
- }
- func (w *responseWriterChannel) WriteHeader(status int) {
- if w.isClosed {
- return
- }
- w.statusChan <- status
- <-w.statusResp
- }
- // implent http.Flusher
- func (w *responseWriterChannel) Flush() {
- if w.isClosed {
- return
- }
- if f, ok := w.backend.(http.Flusher); ok {
- f.Flush()
- }
- }
- // Hijack implements the Hijacker.Hijack method. Our response is both a ResponseWriter
- // and a Hijacker.
- func (w *responseWriterChannel) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) {
- if w.isClosed {
- return nil, nil, fmt.Errorf("response stream has been closed")
- }
- if f, ok := w.backend.(http.Hijacker); ok {
- return f.Hijack()
- }
- return nil, nil, fmt.Errorf("not a hijacker")
- }
- func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWorker) interface{} {
- var err error
- var worker *SWorker
- stop := false
- for !stop {
- select {
- case curWorker, more := <-workerChan:
- if more {
- worker = curWorker
- } else {
- // ignore, worker is responsible for close the channel
- }
- case <-ctx.Done():
- // ctx deadline reached, timeout
- if worker != nil {
- worker.Detach("timeout")
- }
- err = httperrors.NewTimeoutError("request process timeout")
- stop = true
- case bytes, more := <-w.bodyChan:
- // log.Infof("Recive body: %s, more: %v", len(bytes), more)
- if more {
- c, e := w.backend.Write(bytes)
- w.bodyResp <- responseWriterResponse{count: c, err: e}
- } else {
- stop = true
- }
- case status, more := <-w.statusChan:
- // log.Infof("Recive status %d, more: %v", status, more)
- if more {
- w.backend.WriteHeader(status)
- w.statusResp <- true
- } else {
- stop = true
- }
- }
- }
- return err
- }
- func (w *responseWriterChannel) closeChannels() {
- if w.isClosed {
- return
- }
- w.isClosed = true
- close(w.bodyChan)
- close(w.bodyResp)
- close(w.statusChan)
- close(w.statusResp)
- }
|