v1.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /*
  15. Copyright 2015 The Kubernetes Authors.
  16. Licensed under the Apache License, Version 2.0 (the "License");
  17. you may not use this file except in compliance with the License.
  18. You may obtain a copy of the License at
  19. http://www.apache.org/licenses/LICENSE-2.0
  20. Unless required by applicable law or agreed to in writing, software
  21. distributed under the License is distributed on an "AS IS" BASIS,
  22. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  23. See the License for the specific language governing permissions and
  24. limitations under the License.
  25. */
  26. package remotecommand
  27. import (
  28. "fmt"
  29. "io"
  30. "io/ioutil"
  31. "net/http"
  32. v1 "k8s.io/api/core/v1"
  33. "k8s.io/apimachinery/pkg/util/httpstream"
  34. "k8s.io/klog/v2"
  35. )
  36. // streamProtocolV1 implements the first version of the streaming exec & attach
  37. // protocol. This version has some bugs, such as not being able to detect when
  38. // non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
  39. // http://issues.k8s.io/13395 for more details.
  40. type streamProtocolV1 struct {
  41. StreamOptions
  42. errorStream httpstream.Stream
  43. remoteStdin httpstream.Stream
  44. remoteStdout httpstream.Stream
  45. remoteStderr httpstream.Stream
  46. }
  47. var _ streamProtocolHandler = &streamProtocolV1{}
  48. func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
  49. return &streamProtocolV1{
  50. StreamOptions: options,
  51. }
  52. }
  53. func (p *streamProtocolV1) stream(conn streamCreator) error {
  54. doneChan := make(chan struct{}, 2)
  55. errorChan := make(chan error)
  56. cp := func(s string, dst io.Writer, src io.Reader) {
  57. klog.V(6).Infof("Copying %s", s)
  58. defer klog.V(6).Infof("Done copying %s", s)
  59. if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
  60. klog.Errorf("Error copying %s: %v", s, err)
  61. }
  62. if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr {
  63. doneChan <- struct{}{}
  64. }
  65. }
  66. // set up all the streams first
  67. var err error
  68. headers := http.Header{}
  69. headers.Set(v1.StreamType, v1.StreamTypeError)
  70. p.errorStream, err = conn.CreateStream(headers)
  71. if err != nil {
  72. return err
  73. }
  74. defer p.errorStream.Reset()
  75. // Create all the streams first, then start the copy goroutines. The server doesn't start its copy
  76. // goroutines until it's received all of the streams. If the client creates the stdin stream and
  77. // immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
  78. // spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
  79. // getting processed because the server hasn't started its copying, and it won't do that until it
  80. // gets all the streams. By creating all the streams first, we ensure that the server is ready to
  81. // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
  82. if p.Stdin != nil {
  83. headers.Set(v1.StreamType, v1.StreamTypeStdin)
  84. p.remoteStdin, err = conn.CreateStream(headers)
  85. if err != nil {
  86. return err
  87. }
  88. defer p.remoteStdin.Reset()
  89. }
  90. if p.Stdout != nil {
  91. headers.Set(v1.StreamType, v1.StreamTypeStdout)
  92. p.remoteStdout, err = conn.CreateStream(headers)
  93. if err != nil {
  94. return err
  95. }
  96. defer p.remoteStdout.Reset()
  97. }
  98. if p.Stderr != nil && !p.Tty {
  99. headers.Set(v1.StreamType, v1.StreamTypeStderr)
  100. p.remoteStderr, err = conn.CreateStream(headers)
  101. if err != nil {
  102. return err
  103. }
  104. defer p.remoteStderr.Reset()
  105. }
  106. // now that all the streams have been created, proceed with reading & copying
  107. // always read from errorStream
  108. go func() {
  109. message, err := ioutil.ReadAll(p.errorStream)
  110. if err != nil && err != io.EOF {
  111. errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
  112. return
  113. }
  114. if len(message) > 0 {
  115. errorChan <- fmt.Errorf("Error executing remote command: %s", message)
  116. return
  117. }
  118. }()
  119. if p.Stdin != nil {
  120. // TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
  121. // because stdin is not closed until the process exits. If we try to call
  122. // stdin.Close(), it returns no error but doesn't unblock the copy. It will
  123. // exit when the process exits, instead.
  124. go cp(v1.StreamTypeStdin, p.remoteStdin, readerWrapper{p.Stdin})
  125. }
  126. waitCount := 0
  127. completedStreams := 0
  128. if p.Stdout != nil {
  129. waitCount++
  130. go cp(v1.StreamTypeStdout, p.Stdout, p.remoteStdout)
  131. }
  132. if p.Stderr != nil && !p.Tty {
  133. waitCount++
  134. go cp(v1.StreamTypeStderr, p.Stderr, p.remoteStderr)
  135. }
  136. Loop:
  137. for {
  138. select {
  139. case <-doneChan:
  140. completedStreams++
  141. if completedStreams == waitCount {
  142. break Loop
  143. }
  144. case err := <-errorChan:
  145. return err
  146. }
  147. }
  148. return nil
  149. }