client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. package client
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "net"
  7. "os"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/pkg/errors"
  12. "google.golang.org/grpc"
  13. "yunion.io/x/executor/apis"
  14. )
  15. const (
  16. defaultTimeoutSecs = 3
  17. )
  18. var (
  19. exec *Executor
  20. timeoutSeconds = defaultTimeoutSecs
  21. )
  22. func SetTimeoutSeconds(secs int) {
  23. timeoutSeconds = secs
  24. }
  25. func GetTimeoutSeconds() int {
  26. return timeoutSeconds
  27. }
  28. type Executor struct {
  29. socketPath string
  30. }
  31. func Init(socketPath string) {
  32. exec = &Executor{socketPath}
  33. }
  34. func Command(path string, args ...string) *Cmd {
  35. if exec == nil {
  36. panic("executor not init ???")
  37. }
  38. return &Cmd{
  39. Executor: exec,
  40. Path: path,
  41. Args: args,
  42. wg: new(sync.WaitGroup),
  43. stdoutCh: make(chan struct{}),
  44. stderrCh: make(chan struct{}),
  45. }
  46. }
  47. func CommandContext(ctx context.Context, path string, args ...string) *Cmd {
  48. if exec == nil {
  49. panic("executor not init ???")
  50. }
  51. return &Cmd{
  52. Executor: exec,
  53. Path: path,
  54. Args: args,
  55. wg: new(sync.WaitGroup),
  56. stdoutCh: make(chan struct{}),
  57. stderrCh: make(chan struct{}),
  58. ctx: ctx,
  59. }
  60. }
  61. type Cmd struct {
  62. *Executor
  63. ctx context.Context
  64. Path string
  65. Args []string
  66. Env []string
  67. Dir string
  68. conn *grpc.ClientConn
  69. client apis.ExecutorClient
  70. sn *apis.Sn
  71. Stdin io.Reader
  72. Stdout io.Writer
  73. Stderr io.Writer
  74. closeAfterWait []io.Closer
  75. closeAfterAfter []io.Closer
  76. goroutine []func() error
  77. errch chan error
  78. waitDone chan struct{}
  79. stdoutCh chan struct{}
  80. stderrCh chan struct{}
  81. fetchError chan error
  82. streamStdin error
  83. streamStdout error
  84. streamStderr error
  85. wg *sync.WaitGroup
  86. combinedOutput chan struct{}
  87. }
  88. func grcpDialWithUnixSocket(ctx context.Context, socketPath string) (*grpc.ClientConn, error) {
  89. return grpc.DialContext(
  90. ctx, socketPath,
  91. grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(time.Second*time.Duration(timeoutSeconds)),
  92. grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
  93. return net.DialTimeout("unix", addr, timeout)
  94. }),
  95. )
  96. }
  97. func (c *Cmd) Connect(ctx context.Context, opts ...grpc.CallOption,
  98. ) error {
  99. var err error
  100. c.conn, err = grcpDialWithUnixSocket(ctx, c.socketPath)
  101. if err != nil {
  102. return errors.Wrap(err, "grpc dial error")
  103. }
  104. c.client = apis.NewExecutorClient(c.conn)
  105. return nil
  106. }
  107. func (c *Cmd) Run() error {
  108. if err := c.Start(); err != nil {
  109. return err
  110. }
  111. return c.Wait()
  112. }
  113. func (c *Cmd) CombinedOutput() ([]byte, error) {
  114. if c.Stdout != nil {
  115. return nil, errors.New("exec: Stdout already set")
  116. }
  117. if c.Stderr != nil {
  118. return nil, errors.New("exec: Stderr already set")
  119. }
  120. var b bytes.Buffer
  121. c.Stdout = &b
  122. c.Stderr = &b
  123. err := c.Run()
  124. return b.Bytes(), err
  125. }
  126. func (c *Cmd) Output() ([]byte, error) {
  127. if c.Stdout != nil {
  128. return nil, errors.New("exec: Stdout already set")
  129. }
  130. var stdout bytes.Buffer
  131. var stderr bytes.Buffer
  132. c.Stdout = &stdout
  133. c.Stderr = &stderr
  134. // function run return err its mean grpc stream transport error
  135. // cmd execute error indicate by exit code
  136. if err := c.Run(); err != nil {
  137. if e, ok := err.(*ExitError); ok {
  138. e.Stderr = stderr.Bytes()
  139. }
  140. return nil, err
  141. }
  142. return stdout.Bytes(), nil
  143. }
  144. func (c *Cmd) Start() error {
  145. if c.conn != nil {
  146. return errors.New("cmd executing")
  147. }
  148. if err := c.Connect(context.Background()); err != nil {
  149. return err
  150. }
  151. sn, err := c.client.ExecCommand(context.Background(), &apis.Command{
  152. Path: []byte(c.Path),
  153. Args: strArrayToBytesArray(c.Args),
  154. Env: strArrayToBytesArray(c.Env),
  155. Dir: []byte(c.Dir),
  156. })
  157. if err != nil {
  158. c.closeDescriptors()
  159. return errors.Wrap(err, "grcp exec command")
  160. }
  161. c.sn = sn
  162. if c.ctx != nil {
  163. select {
  164. case <-c.ctx.Done():
  165. c.closeDescriptors()
  166. return c.ctx.Err()
  167. default:
  168. }
  169. }
  170. var procIO = [3]*os.File{}
  171. type F func(*Cmd) (*os.File, error)
  172. for i, setupFd := range [3]F{(*Cmd).stdin, (*Cmd).stdout, (*Cmd).stderr} {
  173. if i == 2 && c.Stderr != nil && interfaceEqual(c.Stderr, c.Stdout) {
  174. procIO[2] = procIO[1]
  175. c.combinedOutput = make(chan struct{}, 2)
  176. continue
  177. }
  178. fd, err := setupFd(c)
  179. if err != nil {
  180. c.closeDescriptors()
  181. return errors.Wrap(err, "setup fd")
  182. }
  183. procIO[i] = fd
  184. }
  185. input := &apis.StartInput{
  186. Sn: c.sn.Sn,
  187. HasStdin: procIO[0] != nil,
  188. HasStdout: procIO[1] != nil,
  189. HasStderr: procIO[2] != nil,
  190. }
  191. res, err := c.client.Start(context.Background(), input)
  192. if err != nil {
  193. c.closeDescriptors()
  194. return errors.Wrap(err, "grpc start cmd")
  195. }
  196. if !res.Success {
  197. c.closeDescriptors()
  198. return errors.New(string(res.Error))
  199. }
  200. if procIO[0] != nil {
  201. go c.sendStdin(procIO[0])
  202. }
  203. if procIO[1] != nil {
  204. go c.fetchStdout(procIO[1])
  205. <-c.stdoutCh
  206. }
  207. if procIO[2] != nil {
  208. go c.fetchStderr(procIO[2])
  209. <-c.stderrCh
  210. }
  211. if c.combinedOutput != nil {
  212. go func(wc io.WriteCloser) {
  213. var closed bool
  214. for {
  215. select {
  216. case <-c.combinedOutput:
  217. if closed {
  218. wc.Close()
  219. return
  220. } else {
  221. closed = true
  222. }
  223. }
  224. }
  225. }(procIO[1])
  226. }
  227. c.errch = make(chan error, len(c.goroutine))
  228. for _, fn := range c.goroutine {
  229. go func(fn func() error) {
  230. c.errch <- fn()
  231. }(fn)
  232. }
  233. if c.ctx != nil {
  234. c.waitDone = make(chan struct{})
  235. go func() {
  236. select {
  237. case <-c.ctx.Done():
  238. c.Kill()
  239. case <-c.waitDone:
  240. }
  241. }()
  242. }
  243. return nil
  244. }
  245. func (c *Cmd) streamError() error {
  246. if c.streamStdin != nil {
  247. return c.streamStdin
  248. }
  249. if c.streamStdout != nil {
  250. return c.streamStdout
  251. }
  252. if c.streamStderr != nil {
  253. return c.streamStderr
  254. }
  255. return nil
  256. }
  257. func (c *Cmd) Kill() error {
  258. e, err := c.client.Kill(context.Background(), c.sn)
  259. if err != nil {
  260. return errors.Wrap(err, "grpc send kill")
  261. }
  262. if len(e.Error) > 0 {
  263. return errors.Errorf("kill process %s", e.Error)
  264. }
  265. return nil
  266. }
  267. func (c *Cmd) Wait() error {
  268. if c.conn == nil {
  269. return errors.New("cmd not executing")
  270. }
  271. res, err := c.client.Wait(context.Background(), c.sn)
  272. if err != nil {
  273. c.closeDescriptors()
  274. return errors.Wrap(err, "grpc wait proc")
  275. }
  276. if c.waitDone != nil {
  277. close(c.waitDone)
  278. }
  279. if err := c.streamError(); err != nil {
  280. c.closeDescriptors()
  281. return err
  282. }
  283. c.wg.Wait()
  284. if len(res.ErrContent) > 0 {
  285. return errors.New(string(res.ErrContent))
  286. }
  287. var copyError error
  288. for range c.goroutine {
  289. if err := <-c.errch; err != nil && copyError == nil {
  290. copyError = err
  291. }
  292. }
  293. c.closeDescriptors()
  294. if res.ExitStatus == 0 {
  295. if copyError != nil {
  296. return copyError
  297. }
  298. return nil
  299. } else {
  300. return &ExitError{ExitStatus: newWaitStatus(res.ExitStatus)}
  301. }
  302. }
  303. func (c *Cmd) closeDescriptors() {
  304. for _, fd := range c.closeAfterWait {
  305. fd.Close()
  306. }
  307. for _, fd := range c.closeAfterAfter {
  308. fd.Close()
  309. }
  310. if c.conn != nil {
  311. c.conn.Close()
  312. }
  313. }
  314. func (c *Cmd) StdinPipe() (io.WriteCloser, error) {
  315. if c.Stdin != nil {
  316. return nil, errors.New("exec: Stdin already set")
  317. }
  318. if c.conn != nil {
  319. return nil, errors.New("exec: StdinPipe after process started")
  320. }
  321. // do not use io.Pipe, block forever
  322. // https://stackoverflow.com/questions/47486128
  323. pr, pw, err := os.Pipe()
  324. if err != nil {
  325. return nil, errors.Wrap(err, "open stdinpipe")
  326. }
  327. c.Stdin = pr
  328. c.closeAfterWait = append(c.closeAfterWait, pr)
  329. wc := &CloseOnce{File: pw}
  330. c.closeAfterAfter = append(c.closeAfterAfter, wc)
  331. return wc, nil
  332. }
  333. func (c *Cmd) stdin() (*os.File, error) {
  334. if c.Stdin == nil {
  335. return nil, nil
  336. }
  337. if f, ok := c.Stdin.(*os.File); ok {
  338. return f, nil
  339. }
  340. pr, pw, err := os.Pipe()
  341. if err != nil {
  342. return nil, err
  343. }
  344. c.closeAfterWait = append(c.closeAfterWait, pr)
  345. c.goroutine = append(c.goroutine, func() error {
  346. _, err := io.Copy(pw, c.Stdin)
  347. if err1 := pw.Close(); err == nil {
  348. err = err1
  349. }
  350. return err
  351. })
  352. return pr, nil
  353. }
  354. func (c *Cmd) stdout() (f *os.File, err error) {
  355. return c.writerDescriptor(c.Stdout)
  356. }
  357. func (c *Cmd) stderr() (f *os.File, err error) {
  358. return c.writerDescriptor(c.Stderr)
  359. }
  360. // interfaceEqual protects against panics from doing equality tests on
  361. // two interfaces with non-comparable underlying types.
  362. func interfaceEqual(a, b interface{}) bool {
  363. defer func() {
  364. recover()
  365. }()
  366. return a == b
  367. }
  368. func (c *Cmd) writerDescriptor(w io.Writer) (*os.File, error) {
  369. if w == nil {
  370. return nil, nil
  371. }
  372. if f, ok := w.(*os.File); ok {
  373. return f, nil
  374. }
  375. pr, pw, err := os.Pipe()
  376. if err != nil {
  377. return nil, err
  378. }
  379. c.closeAfterWait = append(c.closeAfterWait, pw)
  380. c.goroutine = append(c.goroutine, func() error {
  381. _, err := io.Copy(w, pr)
  382. pr.Close() // in case io.Copy stopped due to write error
  383. return err
  384. })
  385. return pw, nil
  386. }
  387. func (c *Cmd) StdoutPipe() (io.ReadCloser, error) {
  388. if c.Stdout != nil {
  389. return nil, errors.New("exec: Stdout already set")
  390. }
  391. if c.conn != nil {
  392. return nil, errors.New("exec: StdoutPipe after process started")
  393. }
  394. pr, pw, err := os.Pipe()
  395. if err != nil {
  396. return nil, errors.Wrap(err, "open stdoutpipe")
  397. }
  398. c.Stdout = pw
  399. c.closeAfterWait = append(c.closeAfterWait, pw)
  400. wc := &CloseOnce{File: pr}
  401. c.closeAfterAfter = append(c.closeAfterAfter, wc)
  402. return wc, nil
  403. }
  404. func (c *Cmd) StderrPipe() (io.ReadCloser, error) {
  405. if c.Stderr != nil {
  406. return nil, errors.New("exec: Stderr already set")
  407. }
  408. if c.conn != nil {
  409. return nil, errors.New("exec: StderrPipe after process started")
  410. }
  411. pr, pw, err := os.Pipe()
  412. if err != nil {
  413. return nil, errors.Wrap(err, "open stderrpipe")
  414. }
  415. c.Stderr = pw
  416. c.closeAfterWait = append(c.closeAfterWait, pw)
  417. wc := &CloseOnce{File: pr}
  418. c.closeAfterAfter = append(c.closeAfterAfter, wc)
  419. return wc, nil
  420. }
  421. func (c *Cmd) sendStdin(r io.Reader) {
  422. stream, err := c.client.SendInput(context.Background())
  423. if err != nil {
  424. c.streamStdin = errors.Wrap(err, "grpc send input")
  425. return
  426. }
  427. var data = make([]byte, 4096)
  428. for {
  429. n, err := r.Read(data)
  430. if err == io.EOF {
  431. e, err := stream.CloseAndRecv()
  432. if err != nil {
  433. c.streamStdin = errors.Wrap(err, "grpc send stdin on close and recv")
  434. return
  435. }
  436. if len(e.Error) > 0 {
  437. c.streamStdin = errors.New(string(e.Error))
  438. return
  439. }
  440. return
  441. } else if err != nil {
  442. c.streamStdin = errors.Wrap(err, "read from stdin")
  443. return
  444. }
  445. err = stream.Send(&apis.Input{
  446. Sn: c.sn.Sn,
  447. Input: data[:n],
  448. })
  449. if err != nil {
  450. c.streamStdin = errors.Wrap(err, "grpc send stdin")
  451. return
  452. }
  453. }
  454. }
  455. func (c *Cmd) closeWithCombined() {
  456. c.combinedOutput <- struct{}{}
  457. }
  458. func (c *Cmd) fetchStdout(w io.WriteCloser) {
  459. if c.combinedOutput != nil {
  460. defer c.closeWithCombined()
  461. } else {
  462. defer w.Close()
  463. }
  464. c.wg.Add(1)
  465. defer c.wg.Done()
  466. stream, err := c.client.FetchStdout(context.Background(), c.sn)
  467. if err != nil {
  468. close(c.stdoutCh)
  469. c.streamStdout = errors.Wrap(err, "grpc fetch stdout")
  470. return
  471. }
  472. data, err := stream.Recv()
  473. close(c.stdoutCh)
  474. if err != nil {
  475. c.streamStdout = errors.Wrap(err, "stream stdout")
  476. return
  477. }
  478. if !data.Start {
  479. c.streamStdout = errors.Wrap(err, "stream stdout not start")
  480. return
  481. }
  482. for {
  483. data, err := stream.Recv()
  484. if err == io.EOF {
  485. close(c.stdoutCh)
  486. return
  487. } else if err != nil {
  488. close(c.stdoutCh)
  489. c.streamStdout = errors.Wrap(err, "grpc stdout recv")
  490. return
  491. }
  492. if data.Closed {
  493. return
  494. } else if len(data.RuntimeError) > 0 {
  495. c.streamStdout = errors.New(string(data.RuntimeError))
  496. return
  497. } else {
  498. err := writeTo(data.Stdout, w)
  499. if err != nil {
  500. c.streamStdout = errors.Wrap(err, "write to stdout")
  501. return
  502. }
  503. }
  504. }
  505. }
  506. func (c *Cmd) fetchStderr(w io.WriteCloser) {
  507. if c.combinedOutput != nil {
  508. defer c.closeWithCombined()
  509. } else {
  510. defer w.Close()
  511. }
  512. c.wg.Add(1)
  513. defer c.wg.Done()
  514. stream, err := c.client.FetchStderr(context.Background(), c.sn)
  515. if err != nil {
  516. close(c.stderrCh)
  517. c.streamStderr = errors.Wrap(err, "grpc fetch stderr")
  518. return
  519. }
  520. data, err := stream.Recv()
  521. close(c.stderrCh)
  522. if err != nil {
  523. c.streamStderr = errors.Wrap(err, "stream stderr")
  524. return
  525. }
  526. if !data.Start {
  527. c.streamStderr = errors.Wrap(err, "stream stderr not start")
  528. return
  529. }
  530. for {
  531. data, err := stream.Recv()
  532. if err == io.EOF {
  533. return
  534. } else if err != nil {
  535. c.streamStderr = errors.Wrap(err, "grpc stderr recv")
  536. return
  537. }
  538. if data.Closed {
  539. return
  540. } else if len(data.RuntimeError) > 0 {
  541. c.streamStderr = errors.New(string(data.RuntimeError))
  542. return
  543. } else {
  544. err := writeTo(data.Stderr, w)
  545. if err != nil {
  546. c.streamStderr = errors.Wrap(err, "write to stderr")
  547. return
  548. }
  549. }
  550. }
  551. }
  552. // Convert integer to decimal string
  553. func itoa(val int) string {
  554. if val < 0 {
  555. return "-" + uitoa(uint(-val))
  556. }
  557. return uitoa(uint(val))
  558. }
  559. // Convert unsigned integer to decimal string
  560. func uitoa(val uint) string {
  561. if val == 0 { // avoid string allocation
  562. return "0"
  563. }
  564. var buf [20]byte // big enough for 64bit value base 10
  565. i := len(buf) - 1
  566. for val >= 10 {
  567. q := val / 10
  568. buf[i] = byte('0' + val - q*10)
  569. i--
  570. val = q
  571. }
  572. // val < 10
  573. buf[i] = byte('0' + val)
  574. return string(buf[i:])
  575. }
  576. // Convert exit status to error string
  577. // Source code in exec posix
  578. func exitStatusToString(status syscall.WaitStatus) string {
  579. res := ""
  580. switch {
  581. case status.Exited():
  582. res = "exit status " + itoa(status.ExitStatus())
  583. case status.Signaled():
  584. res = "signal: " + status.Signal().String()
  585. case status.Stopped():
  586. res = "stop signal: " + status.StopSignal().String()
  587. if status.StopSignal() == syscall.SIGTRAP && status.TrapCause() != 0 {
  588. res += " (trap " + itoa(status.TrapCause()) + ")"
  589. }
  590. case status.Continued():
  591. res = "continued"
  592. }
  593. if status.CoreDump() {
  594. res += " (core dumped)"
  595. }
  596. return res
  597. }
  598. type ExitError struct {
  599. ExitStatus syscall.WaitStatus
  600. Stderr []byte
  601. }
  602. func (e *ExitError) Sys() interface{} {
  603. return e.ExitStatus
  604. }
  605. func (e *ExitError) Error() string {
  606. return exitStatusToString(e.ExitStatus)
  607. }
  608. func strArrayToBytesArray(sa []string) [][]byte {
  609. if len(sa) == 0 {
  610. return nil
  611. }
  612. res := make([][]byte, len(sa))
  613. for i := 0; i < len(sa); i++ {
  614. res[i] = []byte(sa[i])
  615. }
  616. return res
  617. }
  618. func writeTo(data []byte, w io.Writer) error {
  619. var n = 0
  620. var length = len(data)
  621. for n < length {
  622. r, e := w.Write(data[n:])
  623. if e != nil {
  624. return e
  625. }
  626. n += r
  627. }
  628. return nil
  629. }
  630. type CloseOnce struct {
  631. *os.File
  632. once sync.Once
  633. err error
  634. }
  635. func (c *CloseOnce) Close() error {
  636. c.once.Do(c.close)
  637. return c.err
  638. }
  639. func (c *CloseOnce) close() {
  640. c.err = c.File.Close()
  641. }