connection.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package spdy
  27. import (
  28. "net"
  29. "net/http"
  30. "sync"
  31. "time"
  32. "github.com/docker/spdystream"
  33. "yunion.io/x/log"
  34. "yunion.io/x/onecloud/pkg/util/httpstream"
  35. )
  36. // connection maintains state about a spdystream.Connection and its associated
  37. // streams.
  38. type connection struct {
  39. conn *spdystream.Connection
  40. streams []httpstream.Stream
  41. streamLock sync.Mutex
  42. newStreamHandler httpstream.NewStreamHandler
  43. }
  44. // NewClientConnection creates a new SPDY client connection.
  45. func NewClientConnection(conn net.Conn) (httpstream.Connection, error) {
  46. spdyConn, err := spdystream.NewConnection(conn, false)
  47. if err != nil {
  48. defer conn.Close()
  49. return nil, err
  50. }
  51. return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil
  52. }
  53. // NewServerConnection creates a new SPDY server connection. newStreamHandler
  54. // will be invoked when the server receives a newly created stream from the
  55. // client.
  56. func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
  57. spdyConn, err := spdystream.NewConnection(conn, true)
  58. if err != nil {
  59. defer conn.Close()
  60. return nil, err
  61. }
  62. return newConnection(spdyConn, newStreamHandler), nil
  63. }
  64. // newConnection returns a new connection wrapping conn. newStreamHandler
  65. // will be invoked when the server receives a newly created stream from the
  66. // client.
  67. func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
  68. c := &connection{conn: conn, newStreamHandler: newStreamHandler}
  69. go conn.Serve(c.newSpdyStream)
  70. return c
  71. }
  72. // createStreamResponseTimeout indicates how long to wait for the other side to
  73. // acknowledge the new stream before timing out.
  74. const createStreamResponseTimeout = 30 * time.Second
  75. // Close first sends a reset for all of the connection's streams, and then
  76. // closes the underlying spdystream.Connection.
  77. func (c *connection) Close() error {
  78. c.streamLock.Lock()
  79. for _, s := range c.streams {
  80. // calling Reset instead of Close ensures that all streams are fully torn down
  81. s.Reset()
  82. }
  83. c.streams = make([]httpstream.Stream, 0)
  84. c.streamLock.Unlock()
  85. // now that all streams are fully torn down, it's safe to call close on the underlying connection,
  86. // which should be able to terminate immediately at this point, instead of waiting for any
  87. // remaining graceful stream termination.
  88. return c.conn.Close()
  89. }
  90. // CreateStream creates a new stream with the specified headers and registers
  91. // it with the connection.
  92. func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) {
  93. stream, err := c.conn.CreateStream(headers, nil, false)
  94. if err != nil {
  95. return nil, err
  96. }
  97. if err = stream.WaitTimeout(createStreamResponseTimeout); err != nil {
  98. return nil, err
  99. }
  100. c.registerStream(stream)
  101. return stream, nil
  102. }
  103. // registerStream adds the stream s to the connection's list of streams that
  104. // it owns.
  105. func (c *connection) registerStream(s httpstream.Stream) {
  106. c.streamLock.Lock()
  107. c.streams = append(c.streams, s)
  108. c.streamLock.Unlock()
  109. }
  110. // CloseChan returns a channel that, when closed, indicates that the underlying
  111. // spdystream.Connection has been closed.
  112. func (c *connection) CloseChan() <-chan bool {
  113. return c.conn.CloseChan()
  114. }
  115. // newSpdyStream is the internal new stream handler used by spdystream.Connection.Serve.
  116. // It calls connection's newStreamHandler, giving it the opportunity to accept or reject
  117. // the stream. If newStreamHandler returns an error, the stream is rejected. If not, the
  118. // stream is accepted and registered with the connection.
  119. func (c *connection) newSpdyStream(stream *spdystream.Stream) {
  120. replySent := make(chan struct{})
  121. err := c.newStreamHandler(stream, replySent)
  122. rejectStream := (err != nil)
  123. if rejectStream {
  124. log.Warningf("Stream rejected: %v", err)
  125. stream.Reset()
  126. return
  127. }
  128. c.registerStream(stream)
  129. stream.SendReply(http.Header{}, rejectStream)
  130. close(replySent)
  131. }
  132. // SetIdleTimeout sets the amount of time the connection may remain idle before
  133. // it is automatically closed.
  134. func (c *connection) SetIdleTimeout(timeout time.Duration) {
  135. c.conn.SetIdleTimeout(timeout)
  136. }