httpstream.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package httpstream
  27. import (
  28. "fmt"
  29. "io"
  30. "net/http"
  31. "strings"
  32. "time"
  33. )
  34. const (
  35. HeaderConnection = "Connection"
  36. HeaderUpgrade = "Upgrade"
  37. HeaderProtocolVersion = "X-Stream-Protocol-Version"
  38. HeaderAcceptedProtocolVersions = "X-Accepted-Stream-Protocol-Versions"
  39. )
  40. // NewStreamHandler defines a function that is called when a new Stream is
  41. // received. If no error is returned, the Stream is accepted; otherwise,
  42. // the stream is rejected. After the reply frame has been sent, replySent is closed.
  43. type NewStreamHandler func(stream Stream, replySent <-chan struct{}) error
  44. // NoOpNewStreamHandler is a stream handler that accepts a new stream and
  45. // performs no other logic.
  46. func NoOpNewStreamHandler(stream Stream, replySent <-chan struct{}) error { return nil }
  47. // Dialer knows how to open a streaming connection to a server.
  48. type Dialer interface {
  49. // Dial opens a streaming connection to a server using one of the protocols
  50. // specified (in order of most preferred to least preferred).
  51. Dial(protocols ...string) (Connection, string, error)
  52. }
  53. // UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade
  54. // HTTP requests to support multiplexed bidirectional streams. After RoundTrip()
  55. // is invoked, if the upgrade is successful, clients may retrieve the upgraded
  56. // connection by calling UpgradeRoundTripper.Connection().
  57. type UpgradeRoundTripper interface {
  58. http.RoundTripper
  59. // NewConnection validates the response and creates a new Connection.
  60. NewConnection(resp *http.Response) (Connection, error)
  61. }
  62. // ResponseUpgrader knows how to upgrade HTTP requests and responses to
  63. // add streaming support to them.
  64. type ResponseUpgrader interface {
  65. // UpgradeResponse upgrades an HTTP response to one that supports multiplexed
  66. // streams. newStreamHandler will be called asynchronously whenever the
  67. // other end of the upgraded connection creates a new stream.
  68. UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler NewStreamHandler) Connection
  69. }
  70. // Connection represents an upgraded HTTP connection.
  71. type Connection interface {
  72. // CreateStream creates a new Stream with the supplied headers.
  73. CreateStream(headers http.Header) (Stream, error)
  74. // Close resets all streams and closes the connection.
  75. Close() error
  76. // CloseChan returns a channel that is closed when the underlying connection is closed.
  77. CloseChan() <-chan bool
  78. // SetIdleTimeout sets the amount of time the connection may remain idle before
  79. // it is automatically closed.
  80. SetIdleTimeout(timeout time.Duration)
  81. }
  82. // Stream represents a bidirectional communications channel that is part of an
  83. // upgraded connection.
  84. type Stream interface {
  85. io.ReadWriteCloser
  86. // Reset closes both directions of the stream, indicating that neither client
  87. // or server can use it any more.
  88. Reset() error
  89. // Headers returns the headers used to create the stream.
  90. Headers() http.Header
  91. // Identifier returns the stream's ID.
  92. Identifier() uint32
  93. }
  94. // IsUpgradeRequest returns true if the given request is a connection upgrade request
  95. func IsUpgradeRequest(req *http.Request) bool {
  96. for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
  97. if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) {
  98. return true
  99. }
  100. }
  101. return false
  102. }
  103. func negotiateProtocol(clientProtocols, serverProtocols []string) string {
  104. for i := range clientProtocols {
  105. for j := range serverProtocols {
  106. if clientProtocols[i] == serverProtocols[j] {
  107. return clientProtocols[i]
  108. }
  109. }
  110. }
  111. return ""
  112. }
  113. // Handshake performs a subprotocol negotiation. If the client did request a
  114. // subprotocol, Handshake will select the first common value found in
  115. // serverProtocols. If a match is found, Handshake adds a response header
  116. // indicating the chosen subprotocol. If no match is found, HTTP forbidden is
  117. // returned, along with a response header containing the list of protocols the
  118. // server can accept.
  119. func Handshake(req *http.Request, w http.ResponseWriter, serverProtocols []string) (string, error) {
  120. clientProtocols := req.Header[http.CanonicalHeaderKey(HeaderProtocolVersion)]
  121. if len(clientProtocols) == 0 {
  122. // Kube 1.0 clients didn't support subprotocol negotiation.
  123. // TODO require clientProtocols once Kube 1.0 is no longer supported
  124. return "", nil
  125. }
  126. if len(serverProtocols) == 0 {
  127. // Kube 1.0 servers didn't support subprotocol negotiation. This is mainly for testing.
  128. // TODO require serverProtocols once Kube 1.0 is no longer supported
  129. return "", nil
  130. }
  131. negotiatedProtocol := negotiateProtocol(clientProtocols, serverProtocols)
  132. if len(negotiatedProtocol) == 0 {
  133. for i := range serverProtocols {
  134. w.Header().Add(HeaderAcceptedProtocolVersions, serverProtocols[i])
  135. }
  136. err := fmt.Errorf("unable to upgrade: unable to negotiate protocol: client supports %v, server accepts %v", clientProtocols, serverProtocols)
  137. http.Error(w, err.Error(), http.StatusForbidden)
  138. return "", err
  139. }
  140. w.Header().Add(HeaderProtocolVersion, negotiatedProtocol)
  141. return negotiatedProtocol, nil
  142. }