server.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package server
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "os/exec"
  7. "sync"
  8. "sync/atomic"
  9. "syscall"
  10. "github.com/pkg/errors"
  11. "yunion.io/x/executor/apis"
  12. "yunion.io/x/log"
  13. )
  14. var globalSn uint32
  15. func NewSN() uint32 {
  16. return atomic.AddUint32(&globalSn, 1)
  17. }
  18. func Len(sm *sync.Map) int {
  19. lengh := 0
  20. f := func(key, value interface{}) bool {
  21. lengh++
  22. return true
  23. }
  24. sm.Range(f)
  25. return lengh
  26. }
  27. var cmds = &sync.Map{}
  28. type Commander struct {
  29. // stream apis.Executor_ExecCommandServer
  30. c *exec.Cmd
  31. stdin io.WriteCloser
  32. stdout io.ReadCloser
  33. stderr io.ReadCloser
  34. wg *sync.WaitGroup
  35. stdoutCh chan struct{}
  36. stderrCh chan struct{}
  37. }
  38. func BytesArrayToStrArray(ba [][]byte) []string {
  39. if len(ba) == 0 {
  40. return nil
  41. }
  42. res := make([]string, len(ba))
  43. for i := 0; i < len(ba); i++ {
  44. res[i] = string(ba[i])
  45. }
  46. return res
  47. }
  48. func NewCommander(in *apis.Command) *Commander {
  49. cmd := exec.Command(string(in.Path), BytesArrayToStrArray(in.Args)...)
  50. if len(in.Env) > 0 {
  51. cmd.Env = BytesArrayToStrArray(in.Env)
  52. }
  53. if len(in.Dir) > 0 {
  54. cmd.Dir = string(in.Dir)
  55. }
  56. cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
  57. return &Commander{
  58. c: cmd,
  59. wg: new(sync.WaitGroup),
  60. }
  61. }
  62. type Executor struct{}
  63. func (e *Executor) ExecCommand(ctx context.Context, req *apis.Command) (*apis.Sn, error) {
  64. cm := NewCommander(req)
  65. sn := NewSN()
  66. log.Infof("%d/%d Exec %s", sn, Len(cmds), req.String())
  67. cmds.Store(sn, cm)
  68. return &apis.Sn{Sn: sn}, nil
  69. }
  70. func (e *Executor) Start(ctx context.Context, req *apis.StartInput) (*apis.StartResponse, error) {
  71. icm, ok := cmds.Load(req.Sn)
  72. if !ok {
  73. return nil, errors.Errorf("unknown sn %d", req.Sn)
  74. }
  75. var (
  76. m = icm.(*Commander)
  77. err error
  78. )
  79. if req.HasStdin {
  80. m.stdin, err = m.c.StdinPipe()
  81. if err != nil {
  82. return &apis.StartResponse{
  83. Success: false,
  84. Error: []byte(err.Error()),
  85. }, nil
  86. }
  87. }
  88. if req.HasStdout {
  89. m.stdout, err = m.c.StdoutPipe()
  90. if err != nil {
  91. return &apis.StartResponse{
  92. Success: false,
  93. Error: []byte(err.Error()),
  94. }, nil
  95. }
  96. m.stdoutCh = make(chan struct{})
  97. }
  98. if req.HasStderr {
  99. m.stderr, err = m.c.StderrPipe()
  100. if err != nil {
  101. return &apis.StartResponse{
  102. Success: false,
  103. Error: []byte(err.Error()),
  104. }, nil
  105. }
  106. m.stderrCh = make(chan struct{})
  107. }
  108. if err := m.c.Start(); err != nil {
  109. return &apis.StartResponse{
  110. Success: false,
  111. Error: []byte(err.Error()),
  112. }, nil
  113. }
  114. return &apis.StartResponse{
  115. Success: true,
  116. Error: nil,
  117. }, nil
  118. }
  119. func (e *Executor) Wait(ctx context.Context, in *apis.Sn) (*apis.WaitResponse, error) {
  120. icm, ok := cmds.Load(in.Sn)
  121. if !ok {
  122. return nil, errors.Errorf("unknown sn %d", in.Sn)
  123. }
  124. m := icm.(*Commander)
  125. // Must wait for stdout/stderr to be fully read BEFORE calling m.c.Wait().
  126. // Once m.c.Wait() returns, exec.Cmd may close the pipe FDs; our reader
  127. // goroutines would then get "read |0: file already closed" and miss data.
  128. if m.stdout != nil {
  129. <-m.stdoutCh
  130. }
  131. if m.stderr != nil {
  132. <-m.stderrCh
  133. }
  134. m.wg.Wait()
  135. err := m.c.Wait()
  136. var (
  137. exitStatus uint32
  138. errContent string
  139. )
  140. if err != nil {
  141. if exiterr, ok := err.(*exec.ExitError); ok {
  142. // The program has exited with an exit code != 0
  143. // This works on both Unix and Windows. Although package
  144. // syscall is generally platform dependent, WaitStatus is
  145. // defined for both Unix and Windows and in both cases has
  146. // an ExitStatus() method with the same signature.
  147. exitStatus = uint32(exiterr.Sys().(syscall.WaitStatus))
  148. } else {
  149. // command not found or io problem or wait was already called
  150. errContent = err.Error()
  151. }
  152. } else {
  153. exitStatus = 0
  154. }
  155. cmds.Delete(in.Sn)
  156. return &apis.WaitResponse{
  157. ExitStatus: exitStatus,
  158. ErrContent: []byte(errContent),
  159. }, nil
  160. }
  161. func (e *Executor) Kill(ctx context.Context, req *apis.Sn) (*apis.Error, error) {
  162. icm, ok := cmds.Load(req.Sn)
  163. if !ok {
  164. return nil, errors.Errorf("unknown sn %d", req.Sn)
  165. }
  166. m := icm.(*Commander)
  167. err := m.c.Process.Kill()
  168. if err != nil {
  169. return &apis.Error{Error: []byte(err.Error())}, nil
  170. }
  171. return &apis.Error{}, nil
  172. }
  173. func (e *Executor) SendInput(s apis.Executor_SendInputServer) error {
  174. var m *Commander
  175. for {
  176. input, err := s.Recv()
  177. if err == io.EOF {
  178. if input != nil && m == nil {
  179. icm, ok := cmds.Load(input.Sn)
  180. if !ok {
  181. return errors.Errorf("unknown sn %d", input.Sn)
  182. }
  183. m = icm.(*Commander)
  184. if m.stdin == nil {
  185. return errors.New("Process stdin not init")
  186. }
  187. }
  188. if m != nil {
  189. if e := m.stdin.Close(); e != nil {
  190. return errors.Wrap(e, "close stdin")
  191. }
  192. }
  193. return s.SendAndClose(&apis.Error{})
  194. } else if err != nil {
  195. return s.SendAndClose(&apis.Error{
  196. Error: []byte(err.Error()),
  197. })
  198. }
  199. if m == nil {
  200. icm, ok := cmds.Load(input.Sn)
  201. if !ok {
  202. return errors.Errorf("unknown sn %d", input.Sn)
  203. }
  204. m = icm.(*Commander)
  205. if m.stdin == nil {
  206. return errors.New("Process stdin not init")
  207. }
  208. }
  209. _, err = m.stdin.Write(input.Input)
  210. if err != nil {
  211. return s.SendAndClose(&apis.Error{
  212. Error: []byte(err.Error()),
  213. })
  214. }
  215. }
  216. }
  217. func (e *Executor) FetchStdout(sn *apis.Sn, s apis.Executor_FetchStdoutServer) error {
  218. icm, ok := cmds.Load(sn.Sn)
  219. if !ok {
  220. return errors.Errorf("unknown sn %d", sn.Sn)
  221. }
  222. var (
  223. m = icm.(*Commander)
  224. data = make([]byte, 4096)
  225. err error
  226. n int
  227. )
  228. if m.stdout == nil {
  229. return errors.New("Process stdout not init")
  230. } else {
  231. close(m.stdoutCh)
  232. }
  233. m.wg.Add(1)
  234. defer m.wg.Done()
  235. s.Send(&apis.Stdout{Start: true})
  236. for {
  237. n, err = m.stdout.Read(data)
  238. if err == io.EOF {
  239. return s.Send(&apis.Stdout{Closed: true})
  240. } else if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrClosed {
  241. return s.Send(&apis.Stdout{Closed: true})
  242. } else if err != nil {
  243. return s.Send(&apis.Stdout{RuntimeError: []byte(err.Error())})
  244. }
  245. err = s.Send(&apis.Stdout{Stdout: data[:n]})
  246. if err != nil {
  247. return err
  248. }
  249. }
  250. }
  251. func (e *Executor) FetchStderr(sn *apis.Sn, s apis.Executor_FetchStderrServer) error {
  252. icm, ok := cmds.Load(sn.Sn)
  253. if !ok {
  254. return errors.Errorf("unknown sn %d", sn.Sn)
  255. }
  256. var (
  257. m = icm.(*Commander)
  258. data = make([]byte, 4096)
  259. err error
  260. n int
  261. )
  262. if m.stderr == nil {
  263. return errors.New("Process stderr not init")
  264. } else {
  265. close(m.stderrCh)
  266. }
  267. m.wg.Add(1)
  268. defer m.wg.Done()
  269. s.Send(&apis.Stderr{Start: true})
  270. for {
  271. n, err = m.stderr.Read(data)
  272. if err == io.EOF {
  273. return s.Send(&apis.Stderr{Closed: true})
  274. } else if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrClosed {
  275. return s.Send(&apis.Stderr{Closed: true})
  276. } else if err != nil {
  277. return s.Send(&apis.Stderr{RuntimeError: []byte(err.Error())})
  278. }
  279. err = s.Send(&apis.Stderr{Stderr: data[:n]})
  280. if err != nil {
  281. return err
  282. }
  283. }
  284. }