haproxy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lbagent
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "os"
  21. "os/exec"
  22. "path/filepath"
  23. "reflect"
  24. "strings"
  25. "sync"
  26. "syscall"
  27. "time"
  28. "yunion.io/x/log"
  29. "yunion.io/x/pkg/errors"
  30. "yunion.io/x/pkg/gotypes"
  31. "yunion.io/x/onecloud/pkg/apis"
  32. "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
  33. "yunion.io/x/onecloud/pkg/hostman/system_service"
  34. agentmodels "yunion.io/x/onecloud/pkg/lbagent/models"
  35. agentutils "yunion.io/x/onecloud/pkg/lbagent/utils"
  36. "yunion.io/x/onecloud/pkg/util/sysutils"
  37. )
  38. type HaproxyHelper struct {
  39. opts *Options
  40. lbagentId string
  41. configDirMan *agentutils.ConfigDirManager
  42. }
  43. func NewHaproxyHelper(opts *Options, lbagentId string) (*HaproxyHelper, error) {
  44. helper := &HaproxyHelper{
  45. opts: opts,
  46. lbagentId: lbagentId,
  47. configDirMan: agentutils.NewConfigDirManager(opts.haproxyConfigDir),
  48. }
  49. {
  50. // sysctl
  51. args := []string{
  52. "sysctl", "-w",
  53. "net.ipv4.ip_nonlocal_bind=1",
  54. "net.ipv4.ip_forward=1",
  55. }
  56. if err := helper.runCmd(args); err != nil {
  57. return nil, fmt.Errorf("sysctl: %s", err)
  58. }
  59. }
  60. return helper, nil
  61. }
  62. func (h *HaproxyHelper) Run(ctx context.Context) {
  63. defer func() {
  64. wg := ctx.Value("wg").(*sync.WaitGroup)
  65. wg.Done()
  66. }()
  67. cmdChan := ctx.Value("cmdChan").(chan *LbagentCmd)
  68. for {
  69. for {
  70. select {
  71. case <-ctx.Done():
  72. log.Infof("haproxy helper bye")
  73. return
  74. case cmd := <-cmdChan:
  75. h.handleCmd(ctx, cmd)
  76. }
  77. }
  78. }
  79. }
  80. func (h *HaproxyHelper) handleCmd(ctx context.Context, cmd *LbagentCmd) {
  81. switch cmd.Type {
  82. case LbagentCmdUseCorpus:
  83. cmdData := cmd.Data.(*LbagentCmdUseCorpusData)
  84. defer cmdData.Wg.Done()
  85. h.handleUseCorpusCmd(ctx, cmd)
  86. case LbagentCmdStopDaemons:
  87. h.handleStopDaemonsCmd(ctx)
  88. default:
  89. log.Warningf("command type ignored: %v", cmd.Type)
  90. }
  91. }
  92. func (h *HaproxyHelper) handleStopDaemonsCmd(ctx context.Context) {
  93. pidFiles := []*agentutils.PidFile{
  94. h.gobetweenPidFile(),
  95. h.haproxyPidFile(),
  96. h.telegrafPidFile(),
  97. }
  98. wg := &sync.WaitGroup{}
  99. wg.Add(len(pidFiles))
  100. for _, pf := range pidFiles {
  101. go func(pf *agentutils.PidFile) {
  102. defer wg.Done()
  103. proc, confirmed, err := pf.ConfirmOrUnlink()
  104. if confirmed {
  105. log.Infof("stopping %s(%d)", pf.Comm, proc.Pid)
  106. proc.Signal(syscall.SIGTERM)
  107. for etime := time.Now().Add(5 * time.Second); etime.Before(time.Now()); {
  108. if err := proc.Signal(syscall.Signal(0)); err == nil {
  109. return
  110. }
  111. time.Sleep(500 * time.Millisecond)
  112. }
  113. proc.Kill()
  114. // TODO check whether proc.Ppid == os.Getpid()
  115. proc.Wait()
  116. }
  117. if err != nil {
  118. log.Warningln(err.Error())
  119. }
  120. }(pf)
  121. }
  122. wg.Wait()
  123. }
  124. func (h *HaproxyHelper) handleUseCorpusCmd(ctx context.Context, cmd *LbagentCmd) {
  125. // haproxy config dir
  126. dir, err := h.configDirMan.NewDir(func(dir string) error {
  127. cmdData := cmd.Data.(*LbagentCmdUseCorpusData)
  128. corpus := cmdData.Corpus
  129. agentParams := cmdData.AgentParams
  130. {
  131. opt := fmt.Sprintf("stats socket %s expose-fd listeners", h.haproxyStatsSocketFile())
  132. agentParams.SetHaproxyParams("global_stats_socket", opt)
  133. }
  134. var genHaproxyConfigsResult *agentmodels.GenHaproxyConfigsResult
  135. var err error
  136. {
  137. // haproxy toplevel global/defaults config
  138. err = corpus.GenHaproxyToplevelConfig(dir, agentParams)
  139. if err != nil {
  140. err = fmt.Errorf("generating haproxy toplevel config failed: %s", err)
  141. return err
  142. }
  143. }
  144. {
  145. // haproxy configs
  146. genHaproxyConfigsResult, err = corpus.GenHaproxyConfigs(dir, agentParams)
  147. if err != nil {
  148. err = fmt.Errorf("generating haproxy config failed: %s", err)
  149. return err
  150. }
  151. }
  152. {
  153. // gobetween config
  154. opts := &agentmodels.GenGobetweenConfigOptions{
  155. LoadbalancersEnabled: genHaproxyConfigsResult.LoadbalancersEnabled,
  156. AgentParams: agentParams,
  157. }
  158. err := corpus.GenGobetweenConfigs(dir, opts)
  159. if err != nil {
  160. err = fmt.Errorf("generating gobetween config failed: %s", err)
  161. return err
  162. }
  163. }
  164. {
  165. // keepalived config
  166. opts := &agentmodels.GenKeepalivedConfigOptions{
  167. LoadbalancersEnabled: genHaproxyConfigsResult.LoadbalancersEnabled,
  168. AgentParams: agentParams,
  169. }
  170. log.Infof("GenKeepalivedConfigs %s", dir)
  171. err := corpus.GenKeepalivedConfigs(dir, opts)
  172. if err != nil {
  173. err = fmt.Errorf("generating keepalived config failed: %s", err)
  174. return err
  175. }
  176. }
  177. if agentParams.AgentModel.Params.Telegraf.InfluxDbOutputUrl != "" {
  178. agentParams.SetTelegrafParams("haproxy_input_stats_socket", h.haproxyStatsSocketFile())
  179. // telegraf config
  180. buf := bytes.NewBufferString("# yunion lb auto-generated telegraf.conf\n")
  181. tmpl := agentParams.TelegrafConfigTmpl
  182. err := tmpl.Execute(buf, agentParams.Data)
  183. if err == nil {
  184. d := buf.Bytes()
  185. p := filepath.Join(dir, "telegraf.conf")
  186. err := os.WriteFile(p, d, agentutils.FileModeFile)
  187. if err == nil {
  188. err := h.reloadTelegraf(ctx, agentParams)
  189. if err != nil {
  190. log.Errorf("reloading telegraf.conf failed: %s", err)
  191. }
  192. } else {
  193. log.Errorf("writing %s failed: %s", p, err)
  194. }
  195. } else {
  196. log.Errorf("making telegraf.conf failed: %s, tmpl:\n%#v", err, tmpl)
  197. }
  198. }
  199. return nil
  200. })
  201. if err != nil {
  202. log.Errorf("making configs: %s", err)
  203. return
  204. }
  205. if err := h.configDirMan.Prune(h.opts.DataPreserveN); err != nil {
  206. log.Errorf("prune configs dir failed: %s", err)
  207. // continue
  208. }
  209. if err := h.useConfigs(ctx, dir); err != nil {
  210. log.Errorf("useConfigs: %s", err)
  211. }
  212. }
  213. func (h *HaproxyHelper) useConfigs(ctx context.Context, d string) error {
  214. lnF := func(old, new string) error {
  215. err := os.RemoveAll(new)
  216. if err != nil {
  217. return err
  218. }
  219. err = os.Symlink(old, new)
  220. return err
  221. }
  222. haproxyConfD := h.haproxyConfD()
  223. gobetweenJson := filepath.Join(h.opts.haproxyConfigDir, "gobetween.json")
  224. keepalivedConf := filepath.Join(h.opts.haproxyConfigDir, "keepalived.conf")
  225. telegrafConf := filepath.Join(h.opts.haproxyConfigDir, "telegraf.conf")
  226. dirMap := map[string]string{
  227. haproxyConfD: d,
  228. gobetweenJson: filepath.Join(d, "gobetween.json"),
  229. keepalivedConf: filepath.Join(d, "keepalived.conf"),
  230. telegrafConf: filepath.Join(d, "telegraf.conf"),
  231. }
  232. for new, old := range dirMap {
  233. err := lnF(old, new)
  234. if err != nil {
  235. return err
  236. }
  237. }
  238. {
  239. var errs []error
  240. var err error
  241. {
  242. // reload haproxy
  243. err = h.reloadHaproxy(ctx)
  244. if err != nil {
  245. errs = append(errs, err)
  246. }
  247. }
  248. {
  249. // reload gobetween
  250. err = h.reloadGobetween(ctx)
  251. if err != nil {
  252. errs = append(errs, err)
  253. }
  254. }
  255. {
  256. // reload keepalived
  257. err = h.reloadKeepalived(ctx)
  258. if err != nil {
  259. errs = append(errs, err)
  260. }
  261. }
  262. if len(errs) == 0 {
  263. return nil
  264. }
  265. return errors.NewAggregate(errs)
  266. }
  267. }
  268. func (h *HaproxyHelper) haproxyConfD() string {
  269. return filepath.Join(h.opts.haproxyConfigDir, "haproxy.conf.d")
  270. }
  271. func (h *HaproxyHelper) haproxyPidFile() *agentutils.PidFile {
  272. pf := agentutils.NewPidFile(
  273. filepath.Join(h.opts.haproxyRunDir, "haproxy.pid"),
  274. "haproxy",
  275. )
  276. return pf
  277. }
  278. func (h *HaproxyHelper) haproxyStatsSocketFile() string {
  279. return filepath.Join(h.opts.haproxyRunDir, "haproxy.sock")
  280. }
  281. func (h *HaproxyHelper) reloadHaproxy(ctx context.Context) error {
  282. // NOTE we may sometimes need to specify a custom the executable path
  283. pidFile := h.haproxyPidFile()
  284. args := []string{
  285. h.opts.HaproxyBin,
  286. "-D", // goes daemon
  287. "-p", pidFile.Path,
  288. "-C", h.haproxyConfD(),
  289. "-f", h.haproxyConfD(),
  290. }
  291. proc, confirmed, err := pidFile.ConfirmOrUnlink()
  292. if !confirmed {
  293. log.Infof("starting haproxy")
  294. return h.runCmd(args)
  295. }
  296. if err != nil {
  297. log.Warningln(err.Error())
  298. }
  299. {
  300. // try reload
  301. args_ := make([]string, len(args))
  302. copy(args_, args)
  303. args_ = append(args_, "-sf", fmt.Sprintf("%d", proc.Pid))
  304. {
  305. statsSocket := h.haproxyStatsSocketFile()
  306. if fi, err := os.Stat(statsSocket); err == nil && fi.Mode()&os.ModeSocket != 0 {
  307. args_ = append(args_, "-x", statsSocket)
  308. } else {
  309. log.Warningf("stats socket %s not found", statsSocket)
  310. }
  311. }
  312. log.Infof("reloading haproxy")
  313. err := h.runCmd(args_)
  314. if err == nil {
  315. return nil
  316. }
  317. log.Errorf("reloading haproxy: %s", err)
  318. }
  319. {
  320. // reload failed
  321. // kill the old
  322. log.Errorf("killing old haproxy %d", proc.Pid)
  323. proc.Signal(syscall.SIGKILL)
  324. killed := false
  325. loop:
  326. for {
  327. timeout := time.NewTimer(3 * time.Second)
  328. ticker := time.NewTicker(10 * time.Millisecond)
  329. defer ticker.Stop()
  330. defer timeout.Stop()
  331. select {
  332. case <-ticker.C:
  333. if err := proc.Signal(syscall.Signal(0)); err != nil {
  334. killed = true
  335. break loop
  336. }
  337. case <-timeout.C:
  338. break loop
  339. }
  340. }
  341. if !killed {
  342. return fmt.Errorf("failed killing haproxy %d", proc.Pid)
  343. }
  344. log.Infof("restarting haproxy")
  345. return h.runCmd(args)
  346. }
  347. }
  348. func (h *HaproxyHelper) gobetweenConf() string {
  349. return filepath.Join(h.opts.haproxyConfigDir, "gobetween.json")
  350. }
  351. func (h *HaproxyHelper) gobetweenPidFile() *agentutils.PidFile {
  352. pf := agentutils.NewPidFile(
  353. filepath.Join(h.opts.haproxyRunDir, "gobetween.pid"),
  354. "gobetween",
  355. )
  356. return pf
  357. }
  358. func (h *HaproxyHelper) reloadGobetween(ctx context.Context) error {
  359. pidFile := h.gobetweenPidFile()
  360. {
  361. proc, confirmed, err := pidFile.ConfirmOrUnlink()
  362. if confirmed {
  363. log.Infof("stopping gobetween(%d)", proc.Pid)
  364. proc.Kill()
  365. proc.Wait()
  366. }
  367. if err != nil {
  368. log.Warningln(err.Error())
  369. }
  370. }
  371. args := []string{
  372. h.opts.GobetweenBin,
  373. "--config", h.gobetweenConf(),
  374. "--format", "json",
  375. }
  376. log.Infof("starting gobetween")
  377. cmd, err := h.startCmd(args)
  378. if err != nil {
  379. return err
  380. }
  381. err = agentutils.WritePidFile(cmd.Process.Pid, pidFile.Path)
  382. if err != nil {
  383. return fmt.Errorf("writing gobetween pid file: %s", err)
  384. }
  385. return nil
  386. }
  387. func (h *HaproxyHelper) telegrafConf() string {
  388. return filepath.Join(h.haproxyConfD(), "telegraf.conf")
  389. }
  390. func (h *HaproxyHelper) telegrafPidFile() *agentutils.PidFile {
  391. pf := agentutils.NewPidFile(
  392. filepath.Join(h.opts.haproxyRunDir, "telegraf.pid"),
  393. "telegraf",
  394. )
  395. return pf
  396. }
  397. func (h *HaproxyHelper) reloadTelegraf(ctx context.Context, agentParams *agentmodels.AgentParams) error {
  398. if h.opts.EnableRemoteExecutor {
  399. return h.remoteReloadTelegraf(ctx, agentParams)
  400. } else {
  401. return h.localReloadTelegraf(ctx)
  402. }
  403. }
  404. func (h *HaproxyHelper) remoteReloadTelegraf(ctx context.Context, agentParams *agentmodels.AgentParams) error {
  405. telegraf := system_service.GetService("telegraf")
  406. conf := map[string]interface{}{}
  407. conf["hostname"] = h.getHostname()
  408. conf["tags"] = map[string]string{
  409. "id": h.lbagentId,
  410. "lbagent_id": h.lbagentId,
  411. "region": h.opts.Region,
  412. "lbagent_ip": h.opts.AccessIp,
  413. hostconsts.TELEGRAF_TAG_KEY_BRAND: hostconsts.TELEGRAF_TAG_ONECLOUD_BRAND,
  414. hostconsts.TELEGRAF_TAG_KEY_RES_TYPE: hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE_LBAGENT,
  415. hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE: hostconsts.TELEGRAF_TAG_ONECLOUD_HOST_TYPE_LBAGENT,
  416. }
  417. conf["nics"] = h.getNicsTelegrafConf()
  418. if len(agentParams.AgentModel.Params.Telegraf.InfluxDbOutputUrl) > 0 {
  419. conf[apis.SERVICE_TYPE_INFLUXDB] = map[string]interface{}{
  420. "url": []string{
  421. agentParams.AgentModel.Params.Telegraf.InfluxDbOutputUrl,
  422. },
  423. "database": agentParams.AgentModel.Params.Telegraf.InfluxDbOutputName,
  424. }
  425. }
  426. conf["haproxy"] = map[string]interface{}{
  427. "interval": agentParams.AgentModel.Params.Telegraf.HaproxyInputInterval,
  428. "stats_socket_path": h.haproxyStatsSocketFile(),
  429. }
  430. oldConf := telegraf.GetConf()
  431. log.Debugf("old config: %s", oldConf)
  432. log.Debugf("new config: %s", conf)
  433. if gotypes.IsNil(oldConf) || !reflect.DeepEqual(oldConf, conf) {
  434. log.Debugf("telegraf config: %s", conf)
  435. telegraf.SetConf(conf)
  436. telegraf.BgReloadConf(conf)
  437. }
  438. return nil
  439. }
  440. func (h *HaproxyHelper) getNicsTelegrafConf() []map[string]interface{} {
  441. var ret = make([]map[string]interface{}, 0)
  442. phyNics, _ := sysutils.Nics()
  443. for _, pnic := range phyNics {
  444. ret = append(ret, map[string]interface{}{
  445. "name": pnic.Dev,
  446. "speed": pnic.Speed,
  447. })
  448. }
  449. return ret
  450. }
  451. func (h *HaproxyHelper) getHostname() string {
  452. hn, err := os.Hostname()
  453. if err != nil {
  454. log.Fatalf("fail to get hostname %s", err)
  455. return ""
  456. }
  457. dotIdx := strings.IndexByte(hn, '.')
  458. if dotIdx >= 0 {
  459. hn = hn[:dotIdx]
  460. }
  461. hn = strings.ToLower(hn)
  462. if len(hn) == 0 {
  463. hn = "host"
  464. }
  465. masterIp := h.opts.AccessIp
  466. return hn + "-" + strings.Replace(masterIp, ".", "-", -1)
  467. }
  468. func (h *HaproxyHelper) localReloadTelegraf(ctx context.Context) error {
  469. pidFile := h.telegrafPidFile()
  470. {
  471. proc, confirmed, err := pidFile.ConfirmOrUnlink()
  472. if confirmed {
  473. log.Infof("stopping telegraf(%d)", proc.Pid)
  474. proc.Kill()
  475. proc.Wait()
  476. }
  477. if err != nil {
  478. log.Warningln(err.Error())
  479. }
  480. }
  481. log.Infof("starting telegraf")
  482. args := []string{
  483. h.opts.TelegrafBin,
  484. "--config", h.telegrafConf(),
  485. }
  486. cmd, err := h.startCmd(args)
  487. if err != nil {
  488. return err
  489. }
  490. err = agentutils.WritePidFile(cmd.Process.Pid, pidFile.Path)
  491. if err != nil {
  492. return fmt.Errorf("writing telegraf pid file: %s", err)
  493. }
  494. return nil
  495. }
  496. func (h *HaproxyHelper) keepalivedConf() string {
  497. return filepath.Join(h.opts.haproxyConfigDir, "keepalived.conf")
  498. }
  499. func (h *HaproxyHelper) keepalivedPidFile() *agentutils.PidFile {
  500. pf := agentutils.NewPidFile(
  501. filepath.Join(h.opts.haproxyRunDir, "keepalived.pid"),
  502. "keepalived",
  503. )
  504. return pf
  505. }
  506. func (h *HaproxyHelper) keepalivedVrrpPidFile() *agentutils.PidFile {
  507. pf := agentutils.NewPidFile(
  508. filepath.Join(h.opts.haproxyRunDir, "keepalived_vrrp.pid"),
  509. "keepalived",
  510. )
  511. return pf
  512. }
  513. func (h *HaproxyHelper) keepalivedCheckersPidFile() *agentutils.PidFile {
  514. pf := agentutils.NewPidFile(
  515. filepath.Join(h.opts.haproxyRunDir, "keepalived_checkers.pid"),
  516. "keepalived",
  517. )
  518. return pf
  519. }
  520. func (h *HaproxyHelper) reloadKeepalived(ctx context.Context) error {
  521. var (
  522. pidFile *agentutils.PidFile
  523. vrrpPidFile *agentutils.PidFile
  524. checkersPidFile *agentutils.PidFile
  525. )
  526. pidFile = h.keepalivedPidFile()
  527. {
  528. proc, confirmed, err := pidFile.ConfirmOrUnlink()
  529. if confirmed {
  530. // send SIGHUP to reload
  531. err := proc.Signal(syscall.SIGHUP)
  532. if err != nil {
  533. return fmt.Errorf("keepalived: send HUP failed: %s", err)
  534. }
  535. return nil
  536. }
  537. if err != nil {
  538. log.Warningln(err.Error())
  539. }
  540. }
  541. vrrpPidFile = h.keepalivedVrrpPidFile()
  542. if _, _, err := vrrpPidFile.ConfirmOrUnlink(); err != nil {
  543. log.Warningln(err.Error())
  544. }
  545. checkersPidFile = h.keepalivedCheckersPidFile()
  546. if _, _, err := checkersPidFile.ConfirmOrUnlink(); err != nil {
  547. log.Warningln(err.Error())
  548. }
  549. args := []string{
  550. h.opts.KeepalivedBin,
  551. "--pid", pidFile.Path,
  552. "--vrrp_pid", vrrpPidFile.Path,
  553. "--checkers_pid", checkersPidFile.Path,
  554. "--use-file", h.keepalivedConf(),
  555. "--log-detail",
  556. "--no-syslog",
  557. "--dump-conf",
  558. "--log-console",
  559. "--dont-fork",
  560. }
  561. err := h.runService(args)
  562. if err != nil {
  563. return errors.Wrapf(err, "run service %s", strings.Join(args, " "))
  564. }
  565. return nil
  566. }
  567. func (h *HaproxyHelper) runCmd(args []string) error {
  568. log.Infof("run command %s", strings.Join(args, " "))
  569. name := args[0]
  570. args = args[1:]
  571. cmd := exec.Command(name, args...)
  572. output, err := cmd.Output()
  573. if err != nil {
  574. if ee, ok := err.(*exec.ExitError); ok {
  575. stdout := string(output)
  576. stderr := string(ee.Stderr)
  577. return fmt.Errorf("%s: %s\nargs: %s\nstdout: %s\nstderr: %s",
  578. name, err, strings.Join(args, " "), stdout, stderr)
  579. }
  580. return fmt.Errorf("%s: %s", name, err)
  581. }
  582. return nil
  583. }
  584. func (h *HaproxyHelper) startCmd(args []string) (*exec.Cmd, error) {
  585. log.Infof("start command %s", strings.Join(args, " "))
  586. name := args[0]
  587. args = args[1:]
  588. cmd := exec.Command(name, args...)
  589. err := cmd.Start()
  590. if err != nil {
  591. return nil, err
  592. }
  593. return cmd, nil
  594. }
  595. func (h *HaproxyHelper) runService(args []string) error {
  596. log.Infof("run service %s", strings.Join(args, " "))
  597. name := args[0]
  598. args = args[1:]
  599. cmd := exec.Command(name, args...)
  600. stdout, err := cmd.StdoutPipe()
  601. if err != nil {
  602. log.Errorf("service %s stdout pipe error: %s", cmd.String(), err)
  603. return errors.Wrapf(err, "service %s stdout pipe error", cmd.String())
  604. }
  605. stderr, err := cmd.StderrPipe()
  606. if err != nil {
  607. log.Errorf("service %s stderr pipe error: %s", cmd.String(), err)
  608. return errors.Wrapf(err, "service %s stderr pipe error", cmd.String())
  609. }
  610. drain := func(out io.ReadCloser, isErr bool) {
  611. defer out.Close()
  612. buf := make([]byte, 1024)
  613. for {
  614. n, err := stdout.Read(buf)
  615. if err != nil {
  616. log.Errorf("read pipe error: %s", err)
  617. return
  618. }
  619. if isErr {
  620. log.Errorln(string(buf[:n]))
  621. } else {
  622. log.Infoln(string(buf[:n]))
  623. }
  624. }
  625. }
  626. err = cmd.Start()
  627. if err != nil {
  628. return errors.Wrapf(err, "start service %s", cmd.String())
  629. }
  630. go func(cmd *exec.Cmd) {
  631. err := cmd.Wait()
  632. if err != nil {
  633. log.Errorf("service %s exited with error: %s", cmd.String(), err)
  634. }
  635. }(cmd)
  636. go drain(stdout, false)
  637. go drain(stderr, true)
  638. return nil
  639. }