connection_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2016 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package spdy
  27. import (
  28. "io"
  29. "net"
  30. "net/http"
  31. "sync"
  32. "testing"
  33. "time"
  34. "yunion.io/x/onecloud/pkg/util/httpstream"
  35. )
  36. func runProxy(t *testing.T, backendUrl string, proxyUrl chan<- string, proxyDone chan<- struct{}) {
  37. listener, err := net.Listen("tcp4", "localhost:0")
  38. if err != nil {
  39. t.Fatalf("error listening: %v", err)
  40. }
  41. defer listener.Close()
  42. proxyUrl <- listener.Addr().String()
  43. clientConn, err := listener.Accept()
  44. if err != nil {
  45. t.Errorf("proxy: error accepting client connection: %v", err)
  46. return
  47. }
  48. backendConn, err := net.Dial("tcp4", backendUrl)
  49. if err != nil {
  50. t.Errorf("proxy: error dialing backend: %v", err)
  51. return
  52. }
  53. defer backendConn.Close()
  54. var wg sync.WaitGroup
  55. wg.Add(2)
  56. go func() {
  57. defer wg.Done()
  58. io.Copy(backendConn, clientConn)
  59. }()
  60. go func() {
  61. defer wg.Done()
  62. io.Copy(clientConn, backendConn)
  63. }()
  64. wg.Wait()
  65. proxyDone <- struct{}{}
  66. }
  67. func runServer(t *testing.T, backendUrl chan<- string, serverDone chan<- struct{}) {
  68. listener, err := net.Listen("tcp4", "localhost:0")
  69. if err != nil {
  70. t.Fatalf("server: error listening: %v", err)
  71. }
  72. defer listener.Close()
  73. backendUrl <- listener.Addr().String()
  74. conn, err := listener.Accept()
  75. if err != nil {
  76. t.Errorf("server: error accepting connection: %v", err)
  77. return
  78. }
  79. streamChan := make(chan httpstream.Stream)
  80. replySentChan := make(chan (<-chan struct{}))
  81. spdyConn, err := NewServerConnection(conn, func(stream httpstream.Stream, replySent <-chan struct{}) error {
  82. streamChan <- stream
  83. replySentChan <- replySent
  84. return nil
  85. })
  86. if err != nil {
  87. t.Errorf("server: error creating spdy connection: %v", err)
  88. return
  89. }
  90. stream := <-streamChan
  91. replySent := <-replySentChan
  92. <-replySent
  93. buf := make([]byte, 1)
  94. _, err = stream.Read(buf)
  95. if err != io.EOF {
  96. t.Errorf("server: unexpected read error: %v", err)
  97. return
  98. }
  99. <-spdyConn.CloseChan()
  100. raw := spdyConn.(*connection).conn
  101. if err := raw.Wait(15 * time.Second); err != nil {
  102. t.Errorf("server: timed out waiting for connection closure: %v", err)
  103. }
  104. serverDone <- struct{}{}
  105. }
  106. func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) {
  107. serverDone := make(chan struct{})
  108. backendUrlChan := make(chan string)
  109. go runServer(t, backendUrlChan, serverDone)
  110. backendUrl := <-backendUrlChan
  111. proxyDone := make(chan struct{})
  112. proxyUrlChan := make(chan string)
  113. go runProxy(t, backendUrl, proxyUrlChan, proxyDone)
  114. proxyUrl := <-proxyUrlChan
  115. conn, err := net.Dial("tcp4", proxyUrl)
  116. if err != nil {
  117. t.Fatalf("client: error connecting to proxy: %v", err)
  118. }
  119. spdyConn, err := NewClientConnection(conn)
  120. if err != nil {
  121. t.Fatalf("client: error creating spdy connection: %v", err)
  122. }
  123. if _, err := spdyConn.CreateStream(http.Header{}); err != nil {
  124. t.Fatalf("client: error creating stream: %v", err)
  125. }
  126. spdyConn.Close()
  127. raw := spdyConn.(*connection).conn
  128. if err := raw.Wait(15 * time.Second); err != nil {
  129. t.Fatalf("client: timed out waiting for connection closure: %v", err)
  130. }
  131. expired := time.NewTimer(15 * time.Second)
  132. defer expired.Stop()
  133. i := 0
  134. for {
  135. select {
  136. case <-expired.C:
  137. t.Fatalf("timed out waiting for proxy and/or server closure")
  138. case <-serverDone:
  139. i++
  140. case <-proxyDone:
  141. i++
  142. }
  143. if i == 2 {
  144. break
  145. }
  146. }
  147. }