upgrade.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package spdy
  27. import (
  28. "bufio"
  29. "fmt"
  30. "io"
  31. "net"
  32. "net/http"
  33. "strings"
  34. "sync/atomic"
  35. "k8s.io/apimachinery/pkg/util/runtime"
  36. "yunion.io/x/onecloud/pkg/util/httpstream"
  37. )
  38. const HeaderSpdy31 = "SPDY/3.1"
  39. // responseUpgrader knows how to upgrade HTTP responses. It
  40. // implements the httpstream.ResponseUpgrader interface.
  41. type responseUpgrader struct {
  42. }
  43. // connWrapper is used to wrap a hijacked connection and its bufio.Reader. All
  44. // calls will be handled directly by the underlying net.Conn with the exception
  45. // of Read and Close calls, which will consider data in the bufio.Reader. This
  46. // ensures that data already inside the used bufio.Reader instance is also
  47. // read.
  48. type connWrapper struct {
  49. net.Conn
  50. closed int32
  51. bufReader *bufio.Reader
  52. }
  53. func (w *connWrapper) Read(b []byte) (n int, err error) {
  54. if atomic.LoadInt32(&w.closed) == 1 {
  55. return 0, io.EOF
  56. }
  57. return w.bufReader.Read(b)
  58. }
  59. func (w *connWrapper) Close() error {
  60. err := w.Conn.Close()
  61. atomic.StoreInt32(&w.closed, 1)
  62. return err
  63. }
  64. // NewResponseUpgrader returns a new httpstream.ResponseUpgrader that is
  65. // capable of upgrading HTTP responses using SPDY/3.1 via the
  66. // spdystream package.
  67. func NewResponseUpgrader() httpstream.ResponseUpgrader {
  68. return responseUpgrader{}
  69. }
  70. // UpgradeResponse upgrades an HTTP response to one that supports multiplexed
  71. // streams. newStreamHandler will be called synchronously whenever the
  72. // other end of the upgraded connection creates a new stream.
  73. func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
  74. connectionHeader := strings.ToLower(req.Header.Get(httpstream.HeaderConnection))
  75. upgradeHeader := strings.ToLower(req.Header.Get(httpstream.HeaderUpgrade))
  76. if !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) {
  77. errorMsg := fmt.Sprintf("unable to upgrade: missing upgrade headers in request: %#v", req.Header)
  78. http.Error(w, errorMsg, http.StatusBadRequest)
  79. return nil
  80. }
  81. hijacker, ok := w.(http.Hijacker)
  82. if !ok {
  83. errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response")
  84. http.Error(w, errorMsg, http.StatusInternalServerError)
  85. return nil
  86. }
  87. w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
  88. w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31)
  89. w.WriteHeader(http.StatusSwitchingProtocols)
  90. conn, bufrw, err := hijacker.Hijack()
  91. if err != nil {
  92. runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err))
  93. return nil
  94. }
  95. connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader}
  96. spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)
  97. if err != nil {
  98. runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err))
  99. return nil
  100. }
  101. return spdyConn
  102. }