upgradeaware.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package proxy
  14. import (
  15. "bufio"
  16. "bytes"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "log"
  21. "net"
  22. "net/http"
  23. "net/http/httputil"
  24. "net/url"
  25. "os"
  26. "strings"
  27. "time"
  28. "k8s.io/apimachinery/pkg/api/errors"
  29. "k8s.io/apimachinery/pkg/util/httpstream"
  30. utilnet "k8s.io/apimachinery/pkg/util/net"
  31. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  32. "github.com/mxk/go-flowrate/flowrate"
  33. "k8s.io/klog/v2"
  34. )
  35. // UpgradeRequestRoundTripper provides an additional method to decorate a request
  36. // with any authentication or other protocol level information prior to performing
  37. // an upgrade on the server. Any response will be handled by the intercepting
  38. // proxy.
  39. type UpgradeRequestRoundTripper interface {
  40. http.RoundTripper
  41. // WrapRequest takes a valid HTTP request and returns a suitably altered version
  42. // of request with any HTTP level values required to complete the request half of
  43. // an upgrade on the server. It does not get a chance to see the response and
  44. // should bypass any request side logic that expects to see the response.
  45. WrapRequest(*http.Request) (*http.Request, error)
  46. }
  47. // UpgradeAwareHandler is a handler for proxy requests that may require an upgrade
  48. type UpgradeAwareHandler struct {
  49. // UpgradeRequired will reject non-upgrade connections if true.
  50. UpgradeRequired bool
  51. // Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server
  52. // for upgrade requests unless UseRequestLocationOnUpgrade is true.
  53. Location *url.URL
  54. // AppendLocationPath determines if the original path of the Location should be appended to the upstream proxy request path
  55. AppendLocationPath bool
  56. // Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
  57. Transport http.RoundTripper
  58. // UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided.
  59. // This allows clients to disable HTTP/2.
  60. UpgradeTransport UpgradeRequestRoundTripper
  61. // WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
  62. WrapTransport bool
  63. // UseRequestLocation will use the incoming request URL when talking to the backend server.
  64. UseRequestLocation bool
  65. // UseLocationHost overrides the HTTP host header in requests to the backend server to use the Host from Location.
  66. // This will override the req.Host field of a request, while UseRequestLocation will override the req.URL field
  67. // of a request. The req.URL.Host specifies the server to connect to, while the req.Host field
  68. // specifies the Host header value to send in the HTTP request. If this is false, the incoming req.Host header will
  69. // just be forwarded to the backend server.
  70. UseLocationHost bool
  71. // FlushInterval controls how often the standard HTTP proxy will flush content from the upstream.
  72. FlushInterval time.Duration
  73. // MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero.
  74. MaxBytesPerSec int64
  75. // Responder is passed errors that occur while setting up proxying.
  76. Responder ErrorResponder
  77. // Reject to forward redirect response
  78. RejectForwardingRedirects bool
  79. }
  80. const defaultFlushInterval = 200 * time.Millisecond
  81. // ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular
  82. // error format.
  83. type ErrorResponder interface {
  84. Error(w http.ResponseWriter, req *http.Request, err error)
  85. }
  86. // SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only
  87. // service a single request/response per proxy.
  88. type SimpleErrorResponder interface {
  89. Error(err error)
  90. }
  91. func NewErrorResponder(r SimpleErrorResponder) ErrorResponder {
  92. return simpleResponder{r}
  93. }
  94. type simpleResponder struct {
  95. responder SimpleErrorResponder
  96. }
  97. func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
  98. r.responder.Error(err)
  99. }
  100. // upgradeRequestRoundTripper implements proxy.UpgradeRequestRoundTripper.
  101. type upgradeRequestRoundTripper struct {
  102. http.RoundTripper
  103. upgrader http.RoundTripper
  104. }
  105. var (
  106. _ UpgradeRequestRoundTripper = &upgradeRequestRoundTripper{}
  107. _ utilnet.RoundTripperWrapper = &upgradeRequestRoundTripper{}
  108. )
  109. // WrappedRoundTripper returns the round tripper that a caller would use.
  110. func (rt *upgradeRequestRoundTripper) WrappedRoundTripper() http.RoundTripper {
  111. return rt.RoundTripper
  112. }
  113. // WriteToRequest calls the nested upgrader and then copies the returned request
  114. // fields onto the passed request.
  115. func (rt *upgradeRequestRoundTripper) WrapRequest(req *http.Request) (*http.Request, error) {
  116. resp, err := rt.upgrader.RoundTrip(req)
  117. if err != nil {
  118. return nil, err
  119. }
  120. return resp.Request, nil
  121. }
  122. // onewayRoundTripper captures the provided request - which is assumed to have
  123. // been modified by other round trippers - and then returns a fake response.
  124. type onewayRoundTripper struct{}
  125. // RoundTrip returns a simple 200 OK response that captures the provided request.
  126. func (onewayRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  127. return &http.Response{
  128. Status: "200 OK",
  129. StatusCode: http.StatusOK,
  130. Body: ioutil.NopCloser(&bytes.Buffer{}),
  131. Request: req,
  132. }, nil
  133. }
  134. // MirrorRequest is a round tripper that can be called to get back the calling request as
  135. // the core round tripper in a chain.
  136. var MirrorRequest http.RoundTripper = onewayRoundTripper{}
  137. // NewUpgradeRequestRoundTripper takes two round trippers - one for the underlying TCP connection, and
  138. // one that is able to write headers to an HTTP request. The request rt is used to set the request headers
  139. // and that is written to the underlying connection rt.
  140. func NewUpgradeRequestRoundTripper(connection, request http.RoundTripper) UpgradeRequestRoundTripper {
  141. return &upgradeRequestRoundTripper{
  142. RoundTripper: connection,
  143. upgrader: request,
  144. }
  145. }
  146. // normalizeLocation returns the result of parsing the full URL, with scheme set to http if missing
  147. func normalizeLocation(location *url.URL) *url.URL {
  148. normalized, _ := url.Parse(location.String())
  149. if len(normalized.Scheme) == 0 {
  150. normalized.Scheme = "http"
  151. }
  152. return normalized
  153. }
  154. // NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
  155. // errors to the caller.
  156. func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler {
  157. return &UpgradeAwareHandler{
  158. Location: normalizeLocation(location),
  159. Transport: transport,
  160. WrapTransport: wrapTransport,
  161. UpgradeRequired: upgradeRequired,
  162. FlushInterval: defaultFlushInterval,
  163. Responder: responder,
  164. }
  165. }
  166. func proxyRedirectsforRootPath(path string, w http.ResponseWriter, req *http.Request) bool {
  167. redirect := false
  168. method := req.Method
  169. // From pkg/genericapiserver/endpoints/handlers/proxy.go#ServeHTTP:
  170. // Redirect requests with an empty path to a location that ends with a '/'
  171. // This is essentially a hack for https://issue.k8s.io/4958.
  172. // Note: Keep this code after tryUpgrade to not break that flow.
  173. if len(path) == 0 && (method == http.MethodGet || method == http.MethodHead) {
  174. var queryPart string
  175. if len(req.URL.RawQuery) > 0 {
  176. queryPart = "?" + req.URL.RawQuery
  177. }
  178. w.Header().Set("Location", req.URL.Path+"/"+queryPart)
  179. w.WriteHeader(http.StatusMovedPermanently)
  180. redirect = true
  181. }
  182. return redirect
  183. }
  184. // ServeHTTP handles the proxy request
  185. func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  186. if h.tryUpgrade(w, req) {
  187. return
  188. }
  189. if h.UpgradeRequired {
  190. h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required"))
  191. return
  192. }
  193. loc := *h.Location
  194. loc.RawQuery = req.URL.RawQuery
  195. // If original request URL ended in '/', append a '/' at the end of the
  196. // of the proxy URL
  197. if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
  198. loc.Path += "/"
  199. }
  200. proxyRedirect := proxyRedirectsforRootPath(loc.Path, w, req)
  201. if proxyRedirect {
  202. return
  203. }
  204. if h.Transport == nil || h.WrapTransport {
  205. h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
  206. }
  207. // WithContext creates a shallow clone of the request with the same context.
  208. newReq := req.WithContext(req.Context())
  209. newReq.Header = utilnet.CloneHeader(req.Header)
  210. if !h.UseRequestLocation {
  211. newReq.URL = &loc
  212. }
  213. if h.UseLocationHost {
  214. // exchanging req.Host with the backend location is necessary for backends that act on the HTTP host header (e.g. API gateways),
  215. // because req.Host has preference over req.URL.Host in filling this header field
  216. newReq.Host = h.Location.Host
  217. }
  218. // create the target location to use for the reverse proxy
  219. reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}
  220. if h.AppendLocationPath {
  221. reverseProxyLocation.Path = h.Location.Path
  222. }
  223. proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation)
  224. proxy.Transport = h.Transport
  225. proxy.FlushInterval = h.FlushInterval
  226. proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
  227. if h.RejectForwardingRedirects {
  228. oldModifyResponse := proxy.ModifyResponse
  229. proxy.ModifyResponse = func(response *http.Response) error {
  230. code := response.StatusCode
  231. if code >= 300 && code <= 399 && len(response.Header.Get("Location")) > 0 {
  232. // close the original response
  233. response.Body.Close()
  234. msg := "the backend attempted to redirect this request, which is not permitted"
  235. // replace the response
  236. *response = http.Response{
  237. StatusCode: http.StatusBadGateway,
  238. Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
  239. Body: io.NopCloser(strings.NewReader(msg)),
  240. ContentLength: int64(len(msg)),
  241. }
  242. } else {
  243. if oldModifyResponse != nil {
  244. if err := oldModifyResponse(response); err != nil {
  245. return err
  246. }
  247. }
  248. }
  249. return nil
  250. }
  251. }
  252. if h.Responder != nil {
  253. // if an optional error interceptor/responder was provided wire it
  254. // the custom responder might be used for providing a unified error reporting
  255. // or supporting retry mechanisms by not sending non-fatal errors to the clients
  256. proxy.ErrorHandler = h.Responder.Error
  257. }
  258. proxy.ServeHTTP(w, newReq)
  259. }
  260. type noSuppressPanicError struct{}
  261. func (noSuppressPanicError) Write(p []byte) (n int, err error) {
  262. // skip "suppressing panic for copyResponse error in test; copy error" error message
  263. // that ends up in CI tests on each kube-apiserver termination as noise and
  264. // everybody thinks this is fatal.
  265. if strings.Contains(string(p), "suppressing panic") {
  266. return len(p), nil
  267. }
  268. return os.Stderr.Write(p)
  269. }
  270. // tryUpgrade returns true if the request was handled.
  271. func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
  272. if !httpstream.IsUpgradeRequest(req) {
  273. klog.V(6).Infof("Request was not an upgrade")
  274. return false
  275. }
  276. var (
  277. backendConn net.Conn
  278. rawResponse []byte
  279. err error
  280. )
  281. location := *h.Location
  282. if h.UseRequestLocation {
  283. location = *req.URL
  284. location.Scheme = h.Location.Scheme
  285. location.Host = h.Location.Host
  286. if h.AppendLocationPath {
  287. location.Path = singleJoiningSlash(h.Location.Path, location.Path)
  288. }
  289. }
  290. clone := utilnet.CloneRequest(req)
  291. // Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
  292. // handles this in the non-upgrade path.
  293. utilnet.AppendForwardedForHeader(clone)
  294. klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header)
  295. if h.UseLocationHost {
  296. clone.Host = h.Location.Host
  297. }
  298. clone.URL = &location
  299. backendConn, err = h.DialForUpgrade(clone)
  300. if err != nil {
  301. klog.V(6).Infof("Proxy connection error: %v", err)
  302. h.Responder.Error(w, req, err)
  303. return true
  304. }
  305. defer backendConn.Close()
  306. // determine the http response code from the backend by reading from rawResponse+backendConn
  307. backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn))
  308. if err != nil {
  309. klog.V(6).Infof("Proxy connection error: %v", err)
  310. h.Responder.Error(w, req, err)
  311. return true
  312. }
  313. if len(headerBytes) > len(rawResponse) {
  314. // we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend
  315. rawResponse = headerBytes
  316. }
  317. // If the backend did not upgrade the request, return an error to the client. If the response was
  318. // an error, the error is forwarded directly after the connection is hijacked. Otherwise, just
  319. // return a generic error here.
  320. if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols && backendHTTPResponse.StatusCode < 400 {
  321. err := fmt.Errorf("invalid upgrade response: status code %d", backendHTTPResponse.StatusCode)
  322. klog.Errorf("Proxy upgrade error: %v", err)
  323. h.Responder.Error(w, req, err)
  324. return true
  325. }
  326. // Once the connection is hijacked, the ErrorResponder will no longer work, so
  327. // hijacking should be the last step in the upgrade.
  328. requestHijacker, ok := w.(http.Hijacker)
  329. if !ok {
  330. klog.V(6).Infof("Unable to hijack response writer: %T", w)
  331. h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
  332. return true
  333. }
  334. requestHijackedConn, _, err := requestHijacker.Hijack()
  335. if err != nil {
  336. klog.V(6).Infof("Unable to hijack response: %v", err)
  337. h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
  338. return true
  339. }
  340. defer requestHijackedConn.Close()
  341. if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols {
  342. // If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection.
  343. klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode)
  344. // set read/write deadlines
  345. deadline := time.Now().Add(10 * time.Second)
  346. backendConn.SetReadDeadline(deadline)
  347. requestHijackedConn.SetWriteDeadline(deadline)
  348. // write the response to the client
  349. err := backendHTTPResponse.Write(requestHijackedConn)
  350. if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
  351. klog.Errorf("Error proxying data from backend to client: %v", err)
  352. }
  353. // Indicate we handled the request
  354. return true
  355. }
  356. // Forward raw response bytes back to client.
  357. if len(rawResponse) > 0 {
  358. klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
  359. if _, err = requestHijackedConn.Write(rawResponse); err != nil {
  360. utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
  361. }
  362. }
  363. // Proxy the connection. This is bidirectional, so we need a goroutine
  364. // to copy in each direction. Once one side of the connection exits, we
  365. // exit the function which performs cleanup and in the process closes
  366. // the other half of the connection in the defer.
  367. writerComplete := make(chan struct{})
  368. readerComplete := make(chan struct{})
  369. go func() {
  370. var writer io.WriteCloser
  371. if h.MaxBytesPerSec > 0 {
  372. writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
  373. } else {
  374. writer = backendConn
  375. }
  376. _, err := io.Copy(writer, requestHijackedConn)
  377. if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
  378. klog.Errorf("Error proxying data from client to backend: %v", err)
  379. }
  380. close(writerComplete)
  381. }()
  382. go func() {
  383. var reader io.ReadCloser
  384. if h.MaxBytesPerSec > 0 {
  385. reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
  386. } else {
  387. reader = backendConn
  388. }
  389. _, err := io.Copy(requestHijackedConn, reader)
  390. if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
  391. klog.Errorf("Error proxying data from backend to client: %v", err)
  392. }
  393. close(readerComplete)
  394. }()
  395. // Wait for one half the connection to exit. Once it does the defer will
  396. // clean up the other half of the connection.
  397. select {
  398. case <-writerComplete:
  399. case <-readerComplete:
  400. }
  401. klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header)
  402. return true
  403. }
  404. // FIXME: Taken from net/http/httputil/reverseproxy.go as singleJoiningSlash is not exported to be re-used.
  405. // See-also: https://github.com/golang/go/issues/44290
  406. func singleJoiningSlash(a, b string) string {
  407. aslash := strings.HasSuffix(a, "/")
  408. bslash := strings.HasPrefix(b, "/")
  409. switch {
  410. case aslash && bslash:
  411. return a + b[1:]
  412. case !aslash && !bslash:
  413. return a + "/" + b
  414. }
  415. return a + b
  416. }
  417. func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) {
  418. if h.UpgradeTransport == nil {
  419. return dial(req, h.Transport)
  420. }
  421. updatedReq, err := h.UpgradeTransport.WrapRequest(req)
  422. if err != nil {
  423. return nil, err
  424. }
  425. return dial(updatedReq, h.UpgradeTransport)
  426. }
  427. // getResponseCode reads a http response from the given reader, returns the response,
  428. // the bytes read from the reader, and any error encountered
  429. func getResponse(r io.Reader) (*http.Response, []byte, error) {
  430. rawResponse := bytes.NewBuffer(make([]byte, 0, 256))
  431. // Save the bytes read while reading the response headers into the rawResponse buffer
  432. resp, err := http.ReadResponse(bufio.NewReader(io.TeeReader(r, rawResponse)), nil)
  433. if err != nil {
  434. return nil, nil, err
  435. }
  436. // return the http response and the raw bytes consumed from the reader in the process
  437. return resp, rawResponse.Bytes(), nil
  438. }
  439. // dial dials the backend at req.URL and writes req to it.
  440. func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) {
  441. conn, err := dialURL(req.Context(), req.URL, transport)
  442. if err != nil {
  443. return nil, fmt.Errorf("error dialing backend: %v", err)
  444. }
  445. if err = req.Write(conn); err != nil {
  446. conn.Close()
  447. return nil, fmt.Errorf("error sending request: %v", err)
  448. }
  449. return conn, err
  450. }
  451. func (h *UpgradeAwareHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
  452. scheme := url.Scheme
  453. host := url.Host
  454. suffix := h.Location.Path
  455. if strings.HasSuffix(url.Path, "/") && !strings.HasSuffix(suffix, "/") {
  456. suffix += "/"
  457. }
  458. pathPrepend := strings.TrimSuffix(url.Path, suffix)
  459. rewritingTransport := &Transport{
  460. Scheme: scheme,
  461. Host: host,
  462. PathPrepend: pathPrepend,
  463. RoundTripper: internalTransport,
  464. }
  465. return &corsRemovingTransport{
  466. RoundTripper: rewritingTransport,
  467. }
  468. }
  469. // corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers
  470. // from the internal response.
  471. // Implements pkg/util/net.RoundTripperWrapper
  472. type corsRemovingTransport struct {
  473. http.RoundTripper
  474. }
  475. var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{})
  476. func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  477. resp, err := rt.RoundTripper.RoundTrip(req)
  478. if err != nil {
  479. return nil, err
  480. }
  481. removeCORSHeaders(resp)
  482. return resp, nil
  483. }
  484. func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper {
  485. return rt.RoundTripper
  486. }
  487. // removeCORSHeaders strip CORS headers sent from the backend
  488. // This should be called on all responses before returning
  489. func removeCORSHeaders(resp *http.Response) {
  490. resp.Header.Del("Access-Control-Allow-Credentials")
  491. resp.Header.Del("Access-Control-Allow-Headers")
  492. resp.Header.Del("Access-Control-Allow-Methods")
  493. resp.Header.Del("Access-Control-Allow-Origin")
  494. }