main.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io"
  19. "os"
  20. "os/exec"
  21. "strconv"
  22. "strings"
  23. "syscall"
  24. "yunion.io/x/log"
  25. "yunion.io/x/pkg/errors"
  26. "yunion.io/x/pkg/util/signalutils"
  27. "yunion.io/x/pkg/utils"
  28. "yunion.io/x/onecloud/pkg/hostman/options"
  29. "yunion.io/x/onecloud/pkg/util/sysutils"
  30. )
  31. var mpsControlBin = "nvidia-cuda-mps-control"
  32. type Daemon struct {
  33. logDir string
  34. pipeDir string
  35. replicas int
  36. }
  37. func NewDaemon(logDir, pipeDir string, replicas int) (*Daemon, error) {
  38. if err := os.MkdirAll(pipeDir, 0755); err != nil {
  39. return nil, fmt.Errorf("error creating directory %v: %s", pipeDir, err)
  40. }
  41. if err := os.MkdirAll(logDir, 0755); err != nil {
  42. return nil, fmt.Errorf("error creating directory %v: %s", logDir, err)
  43. }
  44. return &Daemon{
  45. logDir: logDir,
  46. pipeDir: pipeDir,
  47. replicas: replicas,
  48. }, nil
  49. }
  50. type envvars map[string]string
  51. func (e envvars) toSlice() []string {
  52. var envs []string
  53. for k, v := range e {
  54. envs = append(envs, k+"="+v)
  55. }
  56. return envs
  57. }
  58. func (d *Daemon) LogDir() string {
  59. return d.logDir
  60. }
  61. func (d *Daemon) PipeDir() string {
  62. return d.pipeDir
  63. }
  64. func (d *Daemon) Envvars() envvars {
  65. return map[string]string{
  66. "CUDA_MPS_PIPE_DIRECTORY": d.PipeDir(),
  67. "CUDA_MPS_LOG_DIRECTORY": d.LogDir(),
  68. }
  69. }
  70. // EchoPipeToControl sends the specified command to the MPS control daemon.
  71. func (d *Daemon) EchoPipeToControl(command string) (string, error) {
  72. var out bytes.Buffer
  73. reader, writer := io.Pipe()
  74. defer writer.Close()
  75. defer reader.Close()
  76. mpsDaemon := exec.Command(mpsControlBin)
  77. mpsDaemon.Env = append(mpsDaemon.Env, d.Envvars().toSlice()...)
  78. mpsDaemon.Stdin = reader
  79. mpsDaemon.Stdout = &out
  80. if err := mpsDaemon.Start(); err != nil {
  81. return "", fmt.Errorf("failed to start NVIDIA MPS command: %w", err)
  82. }
  83. if _, err := writer.Write([]byte(command)); err != nil {
  84. return "", fmt.Errorf("failed to write message to pipe: %w", err)
  85. }
  86. _ = writer.Close()
  87. if err := mpsDaemon.Wait(); err != nil {
  88. return "", fmt.Errorf("failed to send command to MPS daemon: %w", err)
  89. }
  90. return out.String(), nil
  91. }
  92. func parseMemSize(memTotalStr string) (int, error) {
  93. if !strings.HasSuffix(memTotalStr, " MiB") {
  94. return -1, fmt.Errorf("unknown mem string suffix")
  95. }
  96. memStr := strings.TrimSpace(strings.TrimSuffix(memTotalStr, " MiB"))
  97. return strconv.Atoi(memStr)
  98. }
  99. func (d *Daemon) Start() error {
  100. // nvidia-smi --query-gpu=gpu_uuid,memory.total,compute_mode --format=csv
  101. // GPU-76aef7ff-372d-2432-b4b4-beca4d8d3400, 23040 MiB, Exclusive_Process
  102. out, err := exec.Command("nvidia-smi", "--query-gpu=index,memory.total,compute_mode", "--format=csv").CombinedOutput()
  103. if err != nil {
  104. return errors.Wrapf(err, "nvidia-smi failed %s", out)
  105. }
  106. var devices = map[string]int{}
  107. lines := strings.Split(string(out), "\n")
  108. for _, line := range lines {
  109. if strings.HasPrefix(line, "index") {
  110. continue
  111. }
  112. segs := strings.Split(line, ",")
  113. if len(segs) != 3 {
  114. log.Errorf("unknown nvidia-smi out line %s", line)
  115. continue
  116. }
  117. gpuIdx, memTotal, computeMode := strings.TrimSpace(segs[0]), strings.TrimSpace(segs[1]), strings.TrimSpace(segs[2])
  118. if computeMode != "Exclusive_Process" {
  119. output, err := exec.Command("nvidia-smi", "-i", gpuIdx, "-c", "EXCLUSIVE_PROCESS").CombinedOutput()
  120. if err != nil {
  121. return fmt.Errorf("error running nvidia-smi: %s %s", output, err)
  122. }
  123. }
  124. memSize, err := parseMemSize(memTotal)
  125. if err != nil {
  126. return errors.Wrapf(err, "failed parse memSize %s", memTotal)
  127. }
  128. devices[gpuIdx] = memSize
  129. }
  130. mpsDaemon := exec.Command(mpsControlBin, "-d")
  131. mpsDaemon.Env = append(mpsDaemon.Env, d.Envvars().toSlice()...)
  132. if err := mpsDaemon.Run(); err != nil {
  133. return err
  134. }
  135. for deviceIdx, memory := range devices {
  136. memLimit := memory / d.replicas
  137. memLimitCmd := fmt.Sprintf("set_default_device_pinned_mem_limit %s %dM", deviceIdx, memLimit)
  138. log.Infof("set device mem limit cmd: %s", memLimitCmd)
  139. _, err := d.EchoPipeToControl(memLimitCmd)
  140. if err != nil {
  141. return fmt.Errorf("error set_default_device_pinned_mem_limit %s", err)
  142. }
  143. }
  144. threadPercentageCmd := fmt.Sprintf("set_default_active_thread_percentage %d", 100/d.replicas)
  145. _, err = d.EchoPipeToControl(threadPercentageCmd)
  146. if err != nil {
  147. return fmt.Errorf("error setting active thread percentage: %s", err)
  148. }
  149. return nil
  150. }
  151. func (d *Daemon) Stop() error {
  152. output, err := d.EchoPipeToControl("quit")
  153. if err != nil {
  154. return fmt.Errorf("error sending quit message: %s %s", output, err)
  155. }
  156. return nil
  157. }
  158. func main() {
  159. options.Init()
  160. isRoot := sysutils.IsRootPermission()
  161. if !isRoot {
  162. log.Fatalf("host service must running with root permissions")
  163. return
  164. }
  165. daemon, err := NewDaemon(
  166. options.HostOptions.CudaMPSLogDirectory,
  167. options.HostOptions.CudaMPSPipeDirectory,
  168. options.HostOptions.CudaMPSReplicas,
  169. )
  170. if err != nil {
  171. log.Fatalf("%s", err.Error())
  172. return
  173. }
  174. var sigChan = make(chan struct{})
  175. signalutils.RegisterSignal(func() {
  176. utils.DumpAllGoroutineStack(log.Logger().Out)
  177. }, syscall.SIGUSR1)
  178. signalutils.RegisterSignal(func() {
  179. sigChan <- struct{}{}
  180. }, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
  181. signalutils.StartTrap()
  182. if err = daemon.Start(); err != nil {
  183. log.Fatalf("%s", err.Error())
  184. }
  185. log.Infof("MPS daemon started ......")
  186. select {
  187. case <-sigChan:
  188. if err := daemon.Stop(); err != nil {
  189. log.Errorf("failed stop daemon %s", err)
  190. os.Exit(1)
  191. }
  192. }
  193. }