hmp.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package monitor
  15. import (
  16. "bufio"
  17. "bytes"
  18. "fmt"
  19. "io"
  20. "regexp"
  21. "strings"
  22. "time"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/onecloud/pkg/util/regutils2"
  26. )
  27. type HmpMonitor struct {
  28. SBaseMonitor
  29. commandQueue []string
  30. callbackQueue []StringCallback
  31. }
  32. func NewHmpMonitor(server, sid string, OnMonitorDisConnect, OnMonitorTimeout MonitorErrorFunc, OnMonitorConnected MonitorSuccFunc) *HmpMonitor {
  33. return &HmpMonitor{
  34. SBaseMonitor: *NewBaseMonitor(server, sid, OnMonitorConnected, OnMonitorDisConnect, OnMonitorTimeout),
  35. commandQueue: make([]string, 0),
  36. callbackQueue: make([]StringCallback, 0),
  37. }
  38. }
  39. var hmpMark = []byte("(qemu) ")
  40. func (m *HmpMonitor) hmpSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
  41. if atEOF && len(data) == 0 {
  42. return 0, nil, nil
  43. }
  44. index := bytes.Index(data, hmpMark)
  45. if index >= 0 {
  46. return index + len(hmpMark), data[0:index], nil
  47. }
  48. if atEOF {
  49. return len(data), data, nil
  50. }
  51. // Request more data.
  52. return 0, nil, nil
  53. }
  54. func (m *HmpMonitor) actionResult(res string) string {
  55. return res
  56. }
  57. func (m *HmpMonitor) read(r io.Reader) {
  58. if !m.checkReading() {
  59. return
  60. }
  61. scanner := bufio.NewScanner(r)
  62. scanner.Split(m.hmpSplitFunc)
  63. for scanner.Scan() {
  64. res := scanner.Text()
  65. if len(res) == 0 {
  66. continue
  67. }
  68. log.Infof("HMP Read %s: %s", m.server, res)
  69. if m.connected {
  70. go m.callBack(res)
  71. } else {
  72. // remove reader timeout
  73. m.connected = true
  74. m.timeout = false
  75. m.rwc.SetReadDeadline(time.Time{})
  76. go m.query()
  77. go m.OnMonitorConnected()
  78. }
  79. }
  80. log.Infof("Scan over %s ...", m.server)
  81. err := scanner.Err()
  82. if err != nil {
  83. log.Infof("HMP Disconnected %s: %s", m.server, err)
  84. }
  85. if m.timeout {
  86. m.OnMonitorTimeout(err)
  87. } else if m.connected {
  88. m.connected = false
  89. m.OnMonitorDisConnect(err)
  90. }
  91. m.reading = false
  92. }
  93. func (m *HmpMonitor) callBack(res string) {
  94. m.mutex.Lock()
  95. if len(m.callbackQueue) == 0 {
  96. m.mutex.Unlock()
  97. return
  98. }
  99. cb := m.callbackQueue[0]
  100. m.callbackQueue = m.callbackQueue[1:]
  101. m.mutex.Unlock()
  102. if cb != nil {
  103. pos := strings.Index(res, "\r\n")
  104. if pos > 0 {
  105. res = res[pos+2:]
  106. }
  107. go cb(res)
  108. }
  109. }
  110. func (m *HmpMonitor) write(cmd []byte) error {
  111. cmd = append(cmd, '\n')
  112. log.Infof("HMP Write %s: %s", m.server, string(cmd))
  113. length, index := len(cmd), 0
  114. for index < length {
  115. i, err := m.rwc.Write(cmd)
  116. if err != nil {
  117. return err
  118. }
  119. index += i
  120. }
  121. return nil
  122. }
  123. func (m *HmpMonitor) query() {
  124. if !m.checkWriting() {
  125. return
  126. }
  127. for {
  128. if len(m.commandQueue) == 0 {
  129. break
  130. }
  131. //pop
  132. m.mutex.Lock()
  133. cmd := m.commandQueue[0]
  134. m.commandQueue = m.commandQueue[1:]
  135. err := m.write([]byte(cmd))
  136. m.mutex.Unlock()
  137. if err != nil {
  138. log.Errorf("Write %s to monitor error %s: %s", cmd, m.server, err)
  139. break
  140. }
  141. }
  142. m.writing = false
  143. }
  144. func (m *HmpMonitor) Query(cmd string, cb StringCallback) {
  145. // push
  146. m.mutex.Lock()
  147. m.commandQueue = append(m.commandQueue, cmd)
  148. m.callbackQueue = append(m.callbackQueue, cb)
  149. m.mutex.Unlock()
  150. if m.connected {
  151. if !m.writing {
  152. go m.query()
  153. }
  154. if !m.reading {
  155. go m.read(m.rwc)
  156. }
  157. }
  158. }
  159. func (m *HmpMonitor) ConnectWithSocket(address string) error {
  160. err := m.SBaseMonitor.connect("unix", address)
  161. if err != nil {
  162. return err
  163. }
  164. go m.read(m.rwc)
  165. return nil
  166. }
  167. func (m *HmpMonitor) Connect(host string, port int) error {
  168. err := m.SBaseMonitor.connect("tcp", fmt.Sprintf("%s:%d", host, port))
  169. if err != nil {
  170. return err
  171. }
  172. go m.read(m.rwc)
  173. return nil
  174. }
  175. func (m *HmpMonitor) QueryStatus(callback StringCallback) {
  176. m.Query("info status", m.parseStatus(callback))
  177. }
  178. func (m *HmpMonitor) SimpleCommand(cmd string, callback StringCallback) {
  179. m.Query(cmd, callback)
  180. }
  181. func (m *HmpMonitor) HumanMonitorCommand(cmd string, callback StringCallback) {
  182. m.Query(cmd, callback)
  183. }
  184. func (m *HmpMonitor) GetVersion(callback StringCallback) {
  185. _cb := func(versionStr string) {
  186. versionStr = strings.TrimSpace(versionStr)
  187. callback(versionStr)
  188. }
  189. m.Query("info version", _cb)
  190. }
  191. func (m *HmpMonitor) GetBlocks(callback func([]QemuBlock)) {
  192. var cb = func(output string) {
  193. var lines = strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
  194. var mergedOutput = []string{}
  195. // merge output
  196. for _, line := range lines {
  197. parts := regexp.MustCompile(`\s+`).Split(line, -1)
  198. if strings.HasSuffix(parts[0], ":") {
  199. mergedOutput = append(mergedOutput, "")
  200. } else if regexp.MustCompile(`\(#block\d+\):`).MatchString(line) {
  201. mergedOutput = append(mergedOutput, "")
  202. }
  203. mergedOutput[len(mergedOutput)-1] = mergedOutput[len(mergedOutput)-1] + " " + line
  204. mergedOutput[len(mergedOutput)-1] = strings.TrimSpace(mergedOutput[len(mergedOutput)-1])
  205. }
  206. // parse to json
  207. ret := []QemuBlock{}
  208. for _, line := range mergedOutput {
  209. parts := regexp.MustCompile(`\s+`).Split(line, -1)
  210. if strings.HasSuffix(parts[0], ":") ||
  211. regexp.MustCompile(`\(#block\d+\):`).MatchString(parts[1]) {
  212. block := QemuBlock{}
  213. if strings.HasSuffix(parts[0], ":") {
  214. block.Device = parts[0][:len(parts[0])-1]
  215. } else {
  216. block.Device = parts[0]
  217. }
  218. if regexp.MustCompile(`\(#block\d+\):`).MatchString(parts[1]) {
  219. block.Inserted.File = parts[2]
  220. for i := 0; i < len(parts)-2; i++ {
  221. if parts[i] == "Backing" && parts[i+1] == "file:" {
  222. block.Inserted.BackingFile = parts[i+2]
  223. break
  224. }
  225. }
  226. }
  227. ret = append(ret, block)
  228. }
  229. }
  230. callback(ret)
  231. }
  232. m.Query("info block", cb)
  233. }
  234. func (m *HmpMonitor) EjectCdrom(dev string, callback StringCallback) {
  235. m.Query(fmt.Sprintf("eject -f %s", dev), callback)
  236. }
  237. func (m *HmpMonitor) ChangeCdrom(dev string, path string, callback StringCallback) {
  238. m.Query(fmt.Sprintf("change %s %s", dev, path), callback)
  239. }
  240. func (m *HmpMonitor) DriveDel(idstr string, callback StringCallback) {
  241. m.Query(fmt.Sprintf("drive_del %s", idstr), callback)
  242. }
  243. func (m *HmpMonitor) DeviceDel(idstr string, callback StringCallback) {
  244. m.Query(fmt.Sprintf("device_del %s", idstr), callback)
  245. }
  246. func (m *HmpMonitor) ObjectDel(idstr string, callback StringCallback) {
  247. m.Query(fmt.Sprintf("object_del %s", idstr), callback)
  248. }
  249. func (m *HmpMonitor) XBlockdevChange(parent, node, child string, callback StringCallback) {
  250. go callback("hmp not support command x-blockdev-change")
  251. }
  252. func (m *HmpMonitor) DriveAdd(bus, node string, params map[string]string, callback StringCallback) {
  253. var paramsKvs = []string{}
  254. for k, v := range params {
  255. paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
  256. }
  257. cmd := "drive_add"
  258. if len(node) > 0 {
  259. cmd = fmt.Sprintf("drive_add -n %s", node)
  260. }
  261. m.Query(fmt.Sprintf("%s %s %s", cmd, bus, strings.Join(paramsKvs, ",")), callback)
  262. }
  263. func (m *HmpMonitor) DeviceAdd(dev string, params map[string]string, callback StringCallback) {
  264. var paramsKvs = []string{}
  265. for k, v := range params {
  266. paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
  267. }
  268. m.Query(fmt.Sprintf("device_add %s,%s", dev, strings.Join(paramsKvs, ",")), callback)
  269. }
  270. func (m *HmpMonitor) MigrateSetDowntime(dtSec float64, callback StringCallback) {
  271. m.Query(fmt.Sprintf("migrate_set_downtime %f", dtSec), callback)
  272. }
  273. func (m *HmpMonitor) MigrateSetCapability(capability, state string, callback StringCallback) {
  274. m.Query(fmt.Sprintf("migrate_set_capability %s %s", capability, state), callback)
  275. }
  276. func (m *HmpMonitor) MigrateSetParameter(key string, val interface{}, callback StringCallback) {
  277. cmd := fmt.Sprintf("migrate_set_parameter %s %s", key, val)
  278. m.Query(cmd, callback)
  279. }
  280. func (m *HmpMonitor) MigrateIncoming(address string, callback StringCallback) {
  281. cmd := fmt.Sprintf("migrate_incoming %s", address)
  282. m.Query(cmd, callback)
  283. }
  284. func (m *HmpMonitor) MigrateContinue(state string, callback StringCallback) {
  285. cmd := fmt.Sprintf("migrate_continue %s", state)
  286. m.Query(cmd, callback)
  287. }
  288. func (m *HmpMonitor) Migrate(
  289. destStr string, copyIncremental, copyFull bool, callback StringCallback,
  290. ) {
  291. cmd := "migrate -d"
  292. if copyIncremental {
  293. cmd += " -i"
  294. } else if copyFull {
  295. cmd += " -b"
  296. }
  297. cmd += " " + destStr
  298. m.Query(cmd, callback)
  299. }
  300. func (m *HmpMonitor) GetMigrateStatus(callback StringCallback) {
  301. cb := func(output string) {
  302. log.Infof("Query migrate status %s: %s", m.server, output)
  303. var status string
  304. for _, line := range strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n") {
  305. if strings.HasPrefix(line, "Migration status") {
  306. status = line[strings.LastIndex(line, " ")+1:]
  307. break
  308. }
  309. }
  310. callback(status)
  311. }
  312. m.Query("info migrate", cb)
  313. }
  314. func (m *HmpMonitor) GetMigrateStats(callback MigrateStatsCallback) {
  315. go callback(nil, errors.Errorf("unsupport get migrate stats"))
  316. }
  317. func (m *HmpMonitor) MigrateCancel(cb StringCallback) {
  318. m.Query("migrate_cancel", cb)
  319. }
  320. func (m *HmpMonitor) MigrateStartPostcopy(callback StringCallback) {
  321. cb := func(output string) {
  322. log.Infof("MigrateStartPostcopy %s: %s", m.server, output)
  323. callback(output)
  324. }
  325. m.Query("migrate_start_postcopy", cb)
  326. }
  327. func (m *HmpMonitor) GetBlockJobCounts(callback func(jobs int)) {
  328. cb := func(output string) {
  329. lines := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
  330. if lines[0] == "No active jobs" {
  331. callback(0)
  332. } else {
  333. callback(len(lines))
  334. }
  335. }
  336. m.Query("info block-jobs", cb)
  337. }
  338. func (m *HmpMonitor) GetBlockJobs(callback func([]BlockJob)) {
  339. cb := func(output string) {
  340. lines := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
  341. if lines[0] == "No active jobs" {
  342. callback(nil)
  343. return
  344. }
  345. jobs := []BlockJob{}
  346. re := regexp.MustCompile(`Type (?P<type>\w+), device (?P<device>\w+)`)
  347. for i := 0; i < len(lines); i++ {
  348. m := regutils2.GetParams(re, lines[i])
  349. if len(m) > 0 {
  350. job := BlockJob{}
  351. job.Type, _ = m["type"]
  352. job.Device, _ = m["device"]
  353. jobs = append(jobs, job)
  354. }
  355. }
  356. callback(jobs)
  357. }
  358. m.Query("info block-jobs", cb)
  359. }
  360. func (m *HmpMonitor) ReloadDiskBlkdev(device, path string, callback StringCallback) {
  361. m.Query(fmt.Sprintf("reload_disk_snapshot_blkdev -n %s %s", device, path), callback)
  362. }
  363. func (m *HmpMonitor) DriveMirror(callback StringCallback, drive, target, syncMode, format string, unmap, blockReplication bool, speed int64) {
  364. cmd := "drive_mirror -n"
  365. if blockReplication {
  366. cmd += " -c"
  367. }
  368. if syncMode == "full" {
  369. cmd += " -f"
  370. }
  371. cmd += fmt.Sprintf(" %s %s %s", drive, target, format)
  372. m.Query(cmd, callback)
  373. }
  374. func (m *HmpMonitor) DriveBackup(callback StringCallback, drive, target, syncMode, format string) {
  375. cmd := "drive_backup -n"
  376. if syncMode == "full" {
  377. cmd += " -f"
  378. }
  379. cmd += fmt.Sprintf(" %s %s %s", drive, target, format)
  380. m.Query(cmd, callback)
  381. }
  382. func (m *HmpMonitor) BlockStream(drive string, callback StringCallback) {
  383. var (
  384. speed = 500 // limit 500 MB/s
  385. cmd = fmt.Sprintf("block_stream %s %d", drive, speed)
  386. )
  387. m.Query(cmd, callback)
  388. }
  389. func (m *HmpMonitor) BlockJobComplete(drive string, callback StringCallback) {
  390. m.Query("block_job_complete", callback)
  391. }
  392. func (m *HmpMonitor) BlockReopenImage(drive, newImagePath, format string, cb StringCallback) {
  393. m.Query(fmt.Sprintf("block_reopen_image %s %s %s", drive, newImagePath, format), cb)
  394. }
  395. func (m *HmpMonitor) SnapshotBlkdev(drive, newImagePath, format string, reuse bool, cb StringCallback) {
  396. var cmd = "snapshot_blkdev"
  397. if reuse {
  398. cmd += " -n"
  399. }
  400. cmd += fmt.Sprintf(" %s %s %s", drive, newImagePath, format)
  401. m.Query(cmd, cb)
  402. }
  403. func (m *HmpMonitor) SetVncPassword(proto, password string, callback StringCallback) {
  404. if len(password) > 8 {
  405. password = password[:8]
  406. }
  407. m.Query(fmt.Sprintf("set_password %s %s", proto, password), callback)
  408. }
  409. func (m *HmpMonitor) StartNbdServer(port int, exportAllDevice, writable bool, callback StringCallback) {
  410. var cmd = "nbd_server_start"
  411. if exportAllDevice {
  412. cmd += " -a"
  413. }
  414. if writable {
  415. cmd += " -w"
  416. }
  417. cmd += fmt.Sprintf(" 0.0.0.0:%d", port)
  418. m.Query(cmd, callback)
  419. }
  420. func (m *HmpMonitor) StopNbdServer(callback StringCallback) {
  421. m.Query("nbd_server_stop", callback)
  422. }
  423. func (m *HmpMonitor) ResizeDisk(driveName string, sizeMB int64, callback StringCallback) {
  424. cmd := fmt.Sprintf("block_resize %s %d", driveName, sizeMB)
  425. m.Query(cmd, callback)
  426. }
  427. func (m *HmpMonitor) GetCpuCount(callback func(count int)) {
  428. var cb = func(output string) {
  429. cpus := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
  430. callback(len(cpus))
  431. }
  432. m.Query("info cpus", cb)
  433. }
  434. func (m *HmpMonitor) AddCpu(cpuIndex int, callback StringCallback) {
  435. m.Query(fmt.Sprintf("cpu-add %d", cpuIndex), callback)
  436. }
  437. func (m *HmpMonitor) GeMemtSlotIndex(callback func(index int)) {
  438. var cb = func(output string) {
  439. memInfos := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
  440. var count int
  441. for _, line := range memInfos {
  442. if strings.HasPrefix(line, "slot:") {
  443. count += 1
  444. }
  445. }
  446. callback(count)
  447. }
  448. m.Query("info memory-devices", cb)
  449. }
  450. func (m *HmpMonitor) GetMemoryDevicesInfo(cb QueryMemoryDevicesCallback) {
  451. go cb(nil, "hmp unsupport get memory devices info")
  452. }
  453. func (m *HmpMonitor) GetMemdevList(callback MemdevListCallback) {
  454. go callback(nil, "hmp unsupport get memdev list")
  455. }
  456. func (m *HmpMonitor) ObjectAdd(objectType string, params map[string]string, callback StringCallback) {
  457. var paramsKvs = []string{}
  458. for k, v := range params {
  459. paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
  460. }
  461. cmd := fmt.Sprintf("object_add %s,%s", objectType, strings.Join(paramsKvs, ","))
  462. m.Query(cmd, callback)
  463. }
  464. func (m *HmpMonitor) BlockIoThrottle(driveName string, bps, iops int64, callback StringCallback) {
  465. cmd := fmt.Sprintf("block_set_io_throttle %s %d 0 0 %d 0 0", driveName, bps, iops)
  466. m.Query(cmd, callback)
  467. }
  468. func (m *HmpMonitor) CancelBlockJob(driveName string, force bool, callback StringCallback) {
  469. cmd := "block_job_cancel "
  470. if force {
  471. cmd += "-f "
  472. }
  473. cmd += driveName
  474. m.Query(cmd, callback)
  475. }
  476. func (m *HmpMonitor) NetdevAdd(id, netType string, params map[string]string, callback StringCallback) {
  477. cmd := fmt.Sprintf("netdev_add %s,id=%s", netType, id)
  478. for k, v := range params {
  479. cmd += fmt.Sprintf(",%s=%s", k, v)
  480. }
  481. m.Query(cmd, callback)
  482. }
  483. func (m *HmpMonitor) NetdevDel(id string, callback StringCallback) {
  484. cmd := fmt.Sprintf("netdev_del %s", id)
  485. m.Query(cmd, callback)
  486. }
  487. func (m *HmpMonitor) SaveState(stateFilePath string, callback StringCallback) {
  488. cmd := fmt.Sprintf(`migrate -d "%s"`, getSaveStatefileUri(stateFilePath))
  489. m.Query(cmd, callback)
  490. }
  491. func (m *HmpMonitor) QueryPci(callback QueryPciCallback) {
  492. go callback(nil, "unsupported query pci for hmp")
  493. }
  494. func (m *HmpMonitor) QueryMachines(callback QueryMachinesCallback) {
  495. go callback(nil, "unsupported query machines for hmp")
  496. }
  497. func (m *HmpMonitor) ScreenDump(savePath string, callback StringCallback) {
  498. m.HumanMonitorCommand(fmt.Sprintf("screendump %s", savePath), callback)
  499. }
  500. func (m *HmpMonitor) Quit(cb StringCallback) {
  501. m.Query("quit", cb)
  502. }
  503. func (m *HmpMonitor) InfoQtree(cb StringCallback) {
  504. m.Query("info qtree", cb)
  505. }
  506. func (m *HmpMonitor) GetHotPluggableCpus(callback HotpluggableCPUListCallback) {
  507. go callback(nil, "unsupported get hotpluggable cpu list for hmp")
  508. }