stream.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package stream
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "net/http"
  21. "strconv"
  22. "time"
  23. "golang.org/x/net/http2"
  24. "k8s.io/apimachinery/pkg/util/net"
  25. "yunion.io/x/log"
  26. "yunion.io/x/pkg/util/httputils"
  27. )
  28. type Request struct {
  29. body io.Reader
  30. headers http.Header
  31. client *http.Client
  32. }
  33. func NewRequest(client *http.Client, body io.Reader, headers http.Header) *Request {
  34. return &Request{
  35. body: body,
  36. headers: headers,
  37. client: client,
  38. }
  39. }
  40. func (r *Request) Stream(ctx context.Context, method string, url string) (io.ReadCloser, error) {
  41. req, err := http.NewRequest(method, url, nil)
  42. if err != nil {
  43. return nil, err
  44. }
  45. if r.body != nil {
  46. req.Body = ioutil.NopCloser(r.body)
  47. }
  48. req = req.WithContext(ctx)
  49. req.Header = r.headers
  50. client := r.client
  51. if client == nil {
  52. client = httputils.GetTimeoutClient(1 * time.Hour)
  53. }
  54. resp, err := client.Do(req)
  55. if err != nil {
  56. return nil, err
  57. }
  58. switch {
  59. case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
  60. return resp.Body, nil
  61. default:
  62. // ensure we close the body before returning the error
  63. defer resp.Body.Close()
  64. result := r.transformResponse(resp, req)
  65. err := result.Error()
  66. if err == nil {
  67. err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
  68. }
  69. return nil, err
  70. }
  71. }
  72. // transformResponse converts an API response into a structured API object
  73. func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
  74. var body []byte
  75. if resp.Body != nil {
  76. data, err := ioutil.ReadAll(resp.Body)
  77. switch err.(type) {
  78. case nil:
  79. body = data
  80. case http2.StreamError:
  81. // This is trying to catch the scenario that the server may close the connection when sending the
  82. // response body. This can be caused by server timeout due to a slow network connection.
  83. // TODO: Add test for this. Steps may be:
  84. // 1. client-go (or kubectl) sends a GET request.
  85. // 2. Apiserver sends back the headers and then part of the body
  86. // 3. Apiserver closes connection.
  87. // 4. client-go should catch this and return an error.
  88. log.Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
  89. streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
  90. return Result{
  91. err: streamErr,
  92. }
  93. default:
  94. log.Errorf("Unexpected error when reading response body: %v", err)
  95. unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
  96. return Result{
  97. err: unexpectedErr,
  98. }
  99. }
  100. }
  101. contentType := resp.Header.Get("Content-Type")
  102. switch {
  103. case resp.StatusCode == http.StatusSwitchingProtocols:
  104. // no-op, we've been upgraded
  105. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  106. // calculate an unstructured error from the response which the Result object may use if the caller
  107. // did not return a structured error.
  108. // retryAfter, _ := retryAfterSeconds(resp)
  109. // err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  110. return Result{
  111. body: body,
  112. contentType: contentType,
  113. statusCode: resp.StatusCode,
  114. err: nil,
  115. }
  116. }
  117. return Result{
  118. body: body,
  119. contentType: contentType,
  120. statusCode: resp.StatusCode,
  121. }
  122. }
  123. // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
  124. // the header was missing or not a valid number.
  125. func retryAfterSeconds(resp *http.Response) (int, bool) {
  126. if h := resp.Header.Get("Retry-After"); len(h) > 0 {
  127. if i, err := strconv.Atoi(h); err == nil {
  128. return i, true
  129. }
  130. }
  131. return 0, false
  132. }
  133. // Result contains the result of calling Request.Do().
  134. type Result struct {
  135. body []byte
  136. warnings []net.WarningHeader
  137. contentType string
  138. err error
  139. statusCode int
  140. }
  141. // Raw returns the raw result.
  142. func (r Result) Raw() ([]byte, error) {
  143. return r.body, r.err
  144. }
  145. func (r Result) Error() error {
  146. return r.err
  147. }