mod_containers.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package compute
  15. import (
  16. "bufio"
  17. "context"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "runtime/debug"
  24. "time"
  25. "yunion.io/x/jsonutils"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/util/httputils"
  29. api "yunion.io/x/onecloud/pkg/apis/compute"
  30. "yunion.io/x/onecloud/pkg/mcclient"
  31. "yunion.io/x/onecloud/pkg/mcclient/modulebase"
  32. "yunion.io/x/onecloud/pkg/mcclient/modules"
  33. "yunion.io/x/onecloud/pkg/util/pod/remotecommand"
  34. "yunion.io/x/onecloud/pkg/util/pod/stream"
  35. "yunion.io/x/onecloud/pkg/util/pod/term"
  36. )
  37. var (
  38. Containers ContainerManager
  39. )
  40. func init() {
  41. Containers = ContainerManager{
  42. modules.NewComputeManager("container", "containers",
  43. []string{"ID", "Name", "Guest_ID", "Status", "Started_At", "Last_Finished_At", "Restart_Count"},
  44. []string{}),
  45. }
  46. modules.RegisterCompute(&Containers)
  47. }
  48. type ContainerManager struct {
  49. modulebase.ResourceManager
  50. }
  51. func (man ContainerManager) SetupTTY(in io.Reader, out io.Writer, errOut io.Writer, raw bool) term.TTY {
  52. t := term.TTY{
  53. Out: out,
  54. }
  55. if in == nil {
  56. t.In = nil
  57. t.Raw = false
  58. return t
  59. }
  60. return term.TTY{
  61. In: in,
  62. Out: out,
  63. Raw: raw,
  64. TryDev: false,
  65. Parent: nil,
  66. }
  67. }
  68. type ContainerExecInput struct {
  69. Command []string
  70. Tty bool
  71. Stdin io.Reader
  72. Stdout io.Writer
  73. Stderr io.Writer
  74. }
  75. func (man ContainerManager) Exec(s *mcclient.ClientSession, id string, opt *ContainerExecInput) error {
  76. defer func() {
  77. if r := recover(); r != nil {
  78. log.Errorf("Panic catched: %s", r)
  79. debug.PrintStack()
  80. }
  81. }()
  82. return man.exec(s, id, opt)
  83. }
  84. func (man ContainerManager) exec(s *mcclient.ClientSession, id string, opt *ContainerExecInput) error {
  85. info, err := man.GetSpecific(s, id, "exec-info", nil)
  86. if err != nil {
  87. return errors.Wrap(err, "get exec info")
  88. }
  89. infoOut := new(api.ContainerExecInfoOutput)
  90. info.Unmarshal(infoOut)
  91. apiInput := &api.ContainerExecInput{
  92. Command: opt.Command,
  93. Tty: opt.Tty,
  94. SetIO: true,
  95. Stdin: opt.Stdin != nil,
  96. Stdout: opt.Stdout != nil,
  97. }
  98. urlLoc := fmt.Sprintf("%s/pods/%s/containers/%s/exec?%s", infoOut.HostUri, infoOut.PodId, infoOut.ContainerId, jsonutils.Marshal(apiInput).QueryString())
  99. url, err := url.Parse(urlLoc)
  100. if err != nil {
  101. return errors.Wrapf(err, "parse url: %s", urlLoc)
  102. }
  103. exec, err := remotecommand.NewSPDYExecutor("POST", url)
  104. if err != nil {
  105. return errors.Wrap(err, "NewSPDYExecutor")
  106. }
  107. headers := mcclient.GetTokenHeaders(s.GetToken())
  108. t := man.SetupTTY(opt.Stdin, opt.Stdout, opt.Stderr, true)
  109. sizeQueue := t.MonitorSize(t.GetSize())
  110. fn := func() error {
  111. return exec.Stream(remotecommand.StreamOptions{
  112. Stdin: opt.Stdin,
  113. Stdout: opt.Stdout,
  114. Stderr: opt.Stderr,
  115. Tty: opt.Tty,
  116. TerminalSizeQueue: sizeQueue,
  117. Header: headers,
  118. })
  119. }
  120. return t.Safe(fn)
  121. }
  122. func (man ContainerManager) Log(s *mcclient.ClientSession, id string, opt *api.PodLogOptions) (io.ReadCloser, error) {
  123. info, err := man.GetSpecific(s, id, "exec-info", nil)
  124. if err != nil {
  125. return nil, errors.Wrap(err, "get exec info")
  126. }
  127. infoOut := new(api.ContainerExecInfoOutput)
  128. if err := info.Unmarshal(infoOut); err != nil {
  129. return nil, errors.Wrap(err, "unmarshal exec info")
  130. }
  131. qs := jsonutils.Marshal(opt).QueryString()
  132. urlLoc := fmt.Sprintf("%s/pods/%s/containers/%s/log?%s", infoOut.HostUri, infoOut.PodId, infoOut.ContainerId, qs)
  133. headers := mcclient.GetTokenHeaders(s.GetToken())
  134. req := stream.NewRequest(httputils.GetTimeoutClient(1*time.Hour), nil, headers)
  135. reader, err := req.Stream(context.Background(), "GET", urlLoc)
  136. if err != nil {
  137. return nil, errors.Wrap(err, "stream request")
  138. }
  139. return reader, nil
  140. }
  141. func (man ContainerManager) LogToWriter(s *mcclient.ClientSession, id string, opt *api.PodLogOptions, out io.Writer) error {
  142. reader, err := man.Log(s, id, opt)
  143. if err != nil {
  144. return errors.Wrap(err, "get container log")
  145. }
  146. defer reader.Close()
  147. r := bufio.NewReader(reader)
  148. for {
  149. bytes, err := r.ReadBytes('\n')
  150. if _, err := out.Write(bytes); err != nil {
  151. return errors.Wrap(err, "write container log to stdout")
  152. }
  153. if err != nil {
  154. if err != io.EOF {
  155. return errors.Wrap(err, "read container log")
  156. }
  157. return nil
  158. }
  159. }
  160. return nil
  161. }
  162. func (man ContainerManager) EnsureDir(s *mcclient.ClientSession, ctrId string, dirName string) error {
  163. opt := &ContainerExecInput{
  164. Command: []string{"mkdir", "-p", dirName},
  165. Tty: false,
  166. Stdin: os.Stdin,
  167. Stdout: os.Stdout,
  168. Stderr: os.Stderr,
  169. }
  170. return man.Exec(s, ctrId, opt)
  171. }
  172. func (man ContainerManager) copyTo(s *mcclient.ClientSession, ctrId string, destPath string, in io.Reader, ctrCmd []string) error {
  173. destDir := path.Dir(destPath)
  174. if err := man.EnsureDir(s, ctrId, destDir); err != nil {
  175. return errors.Wrapf(err, "ensure dir %s", destDir)
  176. }
  177. reader, writer := io.Pipe()
  178. go func() {
  179. defer writer.Close()
  180. written, err := io.Copy(writer, in)
  181. if err != nil {
  182. log.Errorf("copy reader to writer, written %d, error: %v", written, err)
  183. }
  184. }()
  185. opt := &ContainerExecInput{
  186. Command: ctrCmd,
  187. Tty: false,
  188. Stdin: reader,
  189. Stdout: os.Stdout,
  190. Stderr: os.Stderr,
  191. }
  192. return man.Exec(s, ctrId, opt)
  193. }
  194. func (man ContainerManager) CopyTo(s *mcclient.ClientSession, ctrId string, destPath string, in io.Reader) error {
  195. ctrCmd := []string{"sh", "-c", fmt.Sprintf("cat - > %s", destPath)}
  196. return man.copyTo(s, ctrId, destPath, in, ctrCmd)
  197. }
  198. func (man ContainerManager) CopyTarTo(s *mcclient.ClientSession, ctrId string, destDir string, in io.Reader, noSamePermissions bool) error {
  199. ctrCmd := []string{"tar", "-xmf", "-"}
  200. if noSamePermissions {
  201. ctrCmd = []string{"tar", "--no-same-permissions", "--no-same-owner", "-xmf", "-"}
  202. }
  203. ctrCmd = append(ctrCmd, "-C", destDir)
  204. return man.copyTo(s, ctrId, destDir, in, ctrCmd)
  205. }
  206. func (man ContainerManager) CheckDestinationIsDir(s *mcclient.ClientSession, ctrId string, destPath string) error {
  207. opt := &ContainerExecInput{
  208. Command: []string{"test", "-d", destPath},
  209. Tty: false,
  210. Stdin: os.Stdin,
  211. Stdout: os.Stdout,
  212. Stderr: os.Stderr,
  213. }
  214. return man.Exec(s, ctrId, opt)
  215. }
  216. func (man ContainerManager) copyFrom(s *mcclient.ClientSession, ctrId string, out io.Writer, cmd []string) error {
  217. reader, outStream := io.Pipe()
  218. opts := &ContainerExecInput{
  219. Command: cmd,
  220. Tty: false,
  221. Stdin: nil,
  222. Stdout: outStream,
  223. Stderr: os.Stderr,
  224. }
  225. go func() {
  226. defer outStream.Close()
  227. if err := man.Exec(s, ctrId, opts); err != nil {
  228. log.Errorf("compute.Containers.Exec: %v", err)
  229. }
  230. }()
  231. written, err := io.Copy(out, reader)
  232. if err != nil {
  233. return errors.Wrapf(err, "copy from reader written: %d", written)
  234. }
  235. return nil
  236. }
  237. func (man ContainerManager) CopyFrom(s *mcclient.ClientSession, ctrId string, ctrFile string, out io.Writer) error {
  238. return man.copyFrom(s, ctrId, out, []string{"cat", ctrFile})
  239. }
  240. func (man ContainerManager) CopyTarFrom(s *mcclient.ClientSession, ctrId string, ctrDir []string, out io.Writer) error {
  241. cmd := []string{"tar", "cf", "-"}
  242. cmd = append(cmd, ctrDir...)
  243. return man.copyFrom(s, ctrId, out, cmd)
  244. }