| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package stream
- import (
- "context"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "strconv"
- "time"
- "golang.org/x/net/http2"
- "k8s.io/apimachinery/pkg/util/net"
- "yunion.io/x/log"
- "yunion.io/x/pkg/util/httputils"
- )
- type Request struct {
- body io.Reader
- headers http.Header
- client *http.Client
- }
- func NewRequest(client *http.Client, body io.Reader, headers http.Header) *Request {
- return &Request{
- body: body,
- headers: headers,
- client: client,
- }
- }
- func (r *Request) Stream(ctx context.Context, method string, url string) (io.ReadCloser, error) {
- req, err := http.NewRequest(method, url, nil)
- if err != nil {
- return nil, err
- }
- if r.body != nil {
- req.Body = ioutil.NopCloser(r.body)
- }
- req = req.WithContext(ctx)
- req.Header = r.headers
- client := r.client
- if client == nil {
- client = httputils.GetTimeoutClient(1 * time.Hour)
- }
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- switch {
- case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
- return resp.Body, nil
- default:
- // ensure we close the body before returning the error
- defer resp.Body.Close()
- result := r.transformResponse(resp, req)
- err := result.Error()
- if err == nil {
- err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
- }
- return nil, err
- }
- }
- // transformResponse converts an API response into a structured API object
- func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
- var body []byte
- if resp.Body != nil {
- data, err := ioutil.ReadAll(resp.Body)
- switch err.(type) {
- case nil:
- body = data
- case http2.StreamError:
- // This is trying to catch the scenario that the server may close the connection when sending the
- // response body. This can be caused by server timeout due to a slow network connection.
- // TODO: Add test for this. Steps may be:
- // 1. client-go (or kubectl) sends a GET request.
- // 2. Apiserver sends back the headers and then part of the body
- // 3. Apiserver closes connection.
- // 4. client-go should catch this and return an error.
- log.Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
- streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
- return Result{
- err: streamErr,
- }
- default:
- log.Errorf("Unexpected error when reading response body: %v", err)
- unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
- return Result{
- err: unexpectedErr,
- }
- }
- }
- contentType := resp.Header.Get("Content-Type")
- switch {
- case resp.StatusCode == http.StatusSwitchingProtocols:
- // no-op, we've been upgraded
- case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
- // calculate an unstructured error from the response which the Result object may use if the caller
- // did not return a structured error.
- // retryAfter, _ := retryAfterSeconds(resp)
- // err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
- return Result{
- body: body,
- contentType: contentType,
- statusCode: resp.StatusCode,
- err: nil,
- }
- }
- return Result{
- body: body,
- contentType: contentType,
- statusCode: resp.StatusCode,
- }
- }
- // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
- // the header was missing or not a valid number.
- func retryAfterSeconds(resp *http.Response) (int, bool) {
- if h := resp.Header.Get("Retry-After"); len(h) > 0 {
- if i, err := strconv.Atoi(h); err == nil {
- return i, true
- }
- }
- return 0, false
- }
- // Result contains the result of calling Request.Do().
- type Result struct {
- body []byte
- warnings []net.WarningHeader
- contentType string
- err error
- statusCode int
- }
- // Raw returns the raw result.
- func (r Result) Raw() ([]byte, error) {
- return r.body, r.err
- }
- func (r Result) Error() error {
- return r.err
- }
|