roundtripper.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package spdy
  27. import (
  28. "bufio"
  29. "context"
  30. "crypto/tls"
  31. "encoding/base64"
  32. "fmt"
  33. "io/ioutil"
  34. "net"
  35. "net/http"
  36. "net/http/httputil"
  37. "net/url"
  38. "strings"
  39. apierrors "k8s.io/apimachinery/pkg/api/errors"
  40. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  41. "k8s.io/apimachinery/pkg/runtime"
  42. "k8s.io/apimachinery/pkg/runtime/serializer"
  43. utilnet "k8s.io/apimachinery/pkg/util/net"
  44. "k8s.io/apimachinery/third_party/forked/golang/netutil"
  45. "yunion.io/x/onecloud/pkg/util/httpstream"
  46. )
  47. // SpdyRoundTripper knows how to upgrade an HTTP request to one that supports
  48. // multiplexed streams. After RoundTrip() is invoked, Conn will be set
  49. // and usable. SpdyRoundTripper implements the UpgradeRoundTripper interface.
  50. type SpdyRoundTripper struct {
  51. //tlsConfig holds the TLS configuration settings to use when connecting
  52. //to the remote server.
  53. tlsConfig *tls.Config
  54. /* TODO according to http://golang.org/pkg/net/http/#RoundTripper, a RoundTripper
  55. must be safe for use by multiple concurrent goroutines. If this is absolutely
  56. necessary, we could keep a map from http.Request to net.Conn. In practice,
  57. a client will create an http.Client, set the transport to a new insteace of
  58. SpdyRoundTripper, and use it a single time, so this hopefully won't be an issue.
  59. */
  60. // conn is the underlying network connection to the remote server.
  61. conn net.Conn
  62. // Dialer is the dialer used to connect. Used if non-nil.
  63. Dialer *net.Dialer
  64. // proxier knows which proxy to use given a request, defaults to http.ProxyFromEnvironment
  65. // Used primarily for mocking the proxy discovery in tests.
  66. proxier func(req *http.Request) (*url.URL, error)
  67. // followRedirects indicates if the round tripper should examine responses for redirects and
  68. // follow them.
  69. followRedirects bool
  70. // requireSameHostRedirects restricts redirect following to only follow redirects to the same host
  71. // as the original request.
  72. requireSameHostRedirects bool
  73. }
  74. var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{}
  75. var _ httpstream.UpgradeRoundTripper = &SpdyRoundTripper{}
  76. var _ utilnet.Dialer = &SpdyRoundTripper{}
  77. // NewRoundTripper creates a new SpdyRoundTripper that will use
  78. // the specified tlsConfig.
  79. func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) httpstream.UpgradeRoundTripper {
  80. return NewSpdyRoundTripper(tlsConfig, followRedirects, requireSameHostRedirects)
  81. }
  82. // NewSpdyRoundTripper creates a new SpdyRoundTripper that will use
  83. // the specified tlsConfig. This function is mostly meant for unit tests.
  84. func NewSpdyRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper {
  85. return &SpdyRoundTripper{
  86. tlsConfig: tlsConfig,
  87. followRedirects: followRedirects,
  88. requireSameHostRedirects: requireSameHostRedirects,
  89. }
  90. }
  91. // TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during
  92. // proxying with a spdy roundtripper.
  93. func (s *SpdyRoundTripper) TLSClientConfig() *tls.Config {
  94. return s.tlsConfig
  95. }
  96. // Dial implements k8s.io/apimachinery/pkg/util/net.Dialer.
  97. func (s *SpdyRoundTripper) Dial(req *http.Request) (net.Conn, error) {
  98. conn, err := s.dial(req)
  99. if err != nil {
  100. return nil, err
  101. }
  102. if err := req.Write(conn); err != nil {
  103. conn.Close()
  104. return nil, err
  105. }
  106. return conn, nil
  107. }
  108. // dial dials the host specified by req, using TLS if appropriate, optionally
  109. // using a proxy server if one is configured via environment variables.
  110. func (s *SpdyRoundTripper) dial(req *http.Request) (net.Conn, error) {
  111. proxier := s.proxier
  112. if proxier == nil {
  113. proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
  114. }
  115. proxyURL, err := proxier(req)
  116. if err != nil {
  117. return nil, err
  118. }
  119. if proxyURL == nil {
  120. return s.dialWithoutProxy(req.Context(), req.URL)
  121. }
  122. // ensure we use a canonical host with proxyReq
  123. targetHost := netutil.CanonicalAddr(req.URL)
  124. // proxying logic adapted from http://blog.h6t.eu/post/74098062923/golang-websocket-with-http-proxy-support
  125. proxyReq := http.Request{
  126. Method: "CONNECT",
  127. URL: &url.URL{},
  128. Host: targetHost,
  129. }
  130. if pa := s.proxyAuth(proxyURL); pa != "" {
  131. proxyReq.Header = http.Header{}
  132. proxyReq.Header.Set("Proxy-Authorization", pa)
  133. }
  134. proxyDialConn, err := s.dialWithoutProxy(req.Context(), proxyURL)
  135. if err != nil {
  136. return nil, err
  137. }
  138. proxyClientConn := httputil.NewProxyClientConn(proxyDialConn, nil)
  139. _, err = proxyClientConn.Do(&proxyReq)
  140. if err != nil && err != httputil.ErrPersistEOF {
  141. return nil, err
  142. }
  143. rwc, _ := proxyClientConn.Hijack()
  144. if req.URL.Scheme != "https" {
  145. return rwc, nil
  146. }
  147. host, _, err := net.SplitHostPort(targetHost)
  148. if err != nil {
  149. return nil, err
  150. }
  151. tlsConfig := s.tlsConfig
  152. switch {
  153. case tlsConfig == nil:
  154. tlsConfig = &tls.Config{ServerName: host}
  155. case len(tlsConfig.ServerName) == 0:
  156. tlsConfig = tlsConfig.Clone()
  157. tlsConfig.ServerName = host
  158. }
  159. tlsConn := tls.Client(rwc, tlsConfig)
  160. // need to manually call Handshake() so we can call VerifyHostname() below
  161. if err := tlsConn.Handshake(); err != nil {
  162. return nil, err
  163. }
  164. // Return if we were configured to skip validation
  165. if tlsConfig.InsecureSkipVerify {
  166. return tlsConn, nil
  167. }
  168. if err := tlsConn.VerifyHostname(tlsConfig.ServerName); err != nil {
  169. return nil, err
  170. }
  171. return tlsConn, nil
  172. }
  173. // dialWithoutProxy dials the host specified by url, using TLS if appropriate.
  174. func (s *SpdyRoundTripper) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) {
  175. dialAddr := netutil.CanonicalAddr(url)
  176. if url.Scheme == "http" {
  177. if s.Dialer == nil {
  178. var d net.Dialer
  179. return d.DialContext(ctx, "tcp", dialAddr)
  180. } else {
  181. return s.Dialer.DialContext(ctx, "tcp", dialAddr)
  182. }
  183. }
  184. // TODO validate the TLSClientConfig is set up?
  185. var conn *tls.Conn
  186. var err error
  187. if s.Dialer == nil {
  188. conn, err = tls.Dial("tcp", dialAddr, s.tlsConfig)
  189. } else {
  190. conn, err = tls.DialWithDialer(s.Dialer, "tcp", dialAddr, s.tlsConfig)
  191. }
  192. if err != nil {
  193. return nil, err
  194. }
  195. // Return if we were configured to skip validation
  196. if s.tlsConfig != nil && s.tlsConfig.InsecureSkipVerify {
  197. return conn, nil
  198. }
  199. host, _, err := net.SplitHostPort(dialAddr)
  200. if err != nil {
  201. return nil, err
  202. }
  203. if s.tlsConfig != nil && len(s.tlsConfig.ServerName) > 0 {
  204. host = s.tlsConfig.ServerName
  205. }
  206. err = conn.VerifyHostname(host)
  207. if err != nil {
  208. return nil, err
  209. }
  210. return conn, nil
  211. }
  212. // proxyAuth returns, for a given proxy URL, the value to be used for the Proxy-Authorization header
  213. func (s *SpdyRoundTripper) proxyAuth(proxyURL *url.URL) string {
  214. if proxyURL == nil || proxyURL.User == nil {
  215. return ""
  216. }
  217. credentials := proxyURL.User.String()
  218. encodedAuth := base64.StdEncoding.EncodeToString([]byte(credentials))
  219. return fmt.Sprintf("Basic %s", encodedAuth)
  220. }
  221. // RoundTrip executes the Request and upgrades it. After a successful upgrade,
  222. // clients may call SpdyRoundTripper.Connection() to retrieve the upgraded
  223. // connection.
  224. func (s *SpdyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  225. header := utilnet.CloneHeader(req.Header)
  226. header.Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
  227. header.Add(httpstream.HeaderUpgrade, HeaderSpdy31)
  228. reqToSend := req.Clone(req.Context())
  229. reqToSend.Header = header
  230. conn, err := s.Dial(reqToSend)
  231. if err != nil {
  232. return nil, err
  233. }
  234. responseReader := bufio.NewReader(conn)
  235. resp, err := http.ReadResponse(responseReader, nil)
  236. if err != nil {
  237. if conn != nil {
  238. conn.Close()
  239. }
  240. return nil, err
  241. }
  242. s.conn = conn
  243. return resp, nil
  244. }
  245. // NewConnection validates the upgrade response, creating and returning a new
  246. // httpstream.Connection if there were no errors.
  247. func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connection, error) {
  248. connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection))
  249. upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade))
  250. if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) {
  251. defer resp.Body.Close()
  252. responseError := ""
  253. responseErrorBytes, err := ioutil.ReadAll(resp.Body)
  254. if err != nil {
  255. responseError = "unable to read error from server response"
  256. } else {
  257. // TODO: I don't belong here, I should be abstracted from this class
  258. if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil {
  259. if status, ok := obj.(*metav1.Status); ok {
  260. return nil, &apierrors.StatusError{ErrStatus: *status}
  261. }
  262. }
  263. responseError = string(responseErrorBytes)
  264. responseError = strings.TrimSpace(responseError)
  265. }
  266. return nil, fmt.Errorf("unable to upgrade connection: %s", responseError)
  267. }
  268. return NewClientConnection(s.conn)
  269. }
  270. // statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection
  271. var statusScheme = runtime.NewScheme()
  272. // ParameterCodec knows about query parameters used with the meta v1 API spec.
  273. var statusCodecs = serializer.NewCodecFactory(statusScheme)
  274. func init() {
  275. statusScheme.AddUnversionedTypes(metav1.SchemeGroupVersion,
  276. &metav1.Status{},
  277. )
  278. }