qmp.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package monitor
  15. import (
  16. "bufio"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "regexp"
  21. "runtime/debug"
  22. "strings"
  23. "time"
  24. "golang.org/x/net/context"
  25. "yunion.io/x/jsonutils"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/utils"
  29. "yunion.io/x/onecloud/pkg/hostman/hostutils"
  30. )
  31. // https://github.com/qemu/qemu/blob/master/docs/interop/qmp-spec.txt
  32. // https://wiki.qemu.org/QMP
  33. /*
  34. Not support oob yet
  35. 1. response error message
  36. { "error": { "class": json-string, "desc": json-string }, "id": json-value }
  37. 2. response event message
  38. { "event": json-string, "data": json-object,
  39. "timestamp": { "seconds": json-number, "microseconds": json-number } }
  40. 3. response cmd result
  41. { "return": json-value, "id": json-value }
  42. 4. response qmp init information
  43. { "QMP": {"version": {"qemu": {"micro": 0, "minor": 0, "major": 3},
  44. "package": "v3.0.0"}, "capabilities": [] } }
  45. */
  46. var ignoreEvents = []string{`"RTC_CHANGE"`}
  47. type qmpMonitorCallBack func(*Response)
  48. type qmpEventCallback func(*Event)
  49. type Response struct {
  50. Return []byte
  51. ErrorVal *Error
  52. Id string
  53. }
  54. type Event struct {
  55. Event string `json:"event"`
  56. Data map[string]interface{} `json:"data"`
  57. Timestamp *Timestamp `json:"timestamp"`
  58. }
  59. func (e *Event) String() string {
  60. return fmt.Sprintf("QMP Event result: %#v", e)
  61. }
  62. type Timestamp struct {
  63. Seconds int64 `json:"seconds"`
  64. Microsenconds int64 `json:"microsenconds"`
  65. }
  66. type Command struct {
  67. Execute string `json:"execute"`
  68. Args interface{} `json:"arguments,omitempty"`
  69. }
  70. type NetworkModify struct {
  71. Device string `json:"device"`
  72. Ipmask string `json:"ipmask"`
  73. Gateway string `json:"gateway"`
  74. Ip6mask string `json:"ip6mask"`
  75. Gateway6 string `json:"gateway6"`
  76. }
  77. type Version struct {
  78. Package string `json:"package"`
  79. QEMU struct {
  80. Major int `json:"major"`
  81. Micro int `json:"micro"`
  82. Minor int `json:"minor"`
  83. } `json:"qemu"`
  84. }
  85. func (v *Version) String() string {
  86. q := v.QEMU
  87. return fmt.Sprintf("%d.%d.%d", q.Major, q.Minor, q.Micro)
  88. }
  89. type Error struct {
  90. Class string `json:"class"`
  91. Desc string `json:"desc"`
  92. }
  93. func (e *Error) Error() string {
  94. return fmt.Sprintf("%s: %s", e.Class, e.Desc)
  95. }
  96. type QmpMonitor struct {
  97. SBaseMonitor
  98. qmpEventFunc qmpEventCallback
  99. commandQueue []*Command
  100. callbackQueue []qmpMonitorCallBack
  101. jobs map[string]BlockJob
  102. }
  103. func NewQmpMonitor(server, sid string, OnMonitorDisConnect, OnMonitorTimeout MonitorErrorFunc,
  104. OnMonitorConnected MonitorSuccFunc, qmpEventFunc qmpEventCallback) *QmpMonitor {
  105. m := &QmpMonitor{
  106. SBaseMonitor: *NewBaseMonitor(server, sid, OnMonitorConnected, OnMonitorDisConnect, OnMonitorTimeout),
  107. qmpEventFunc: qmpEventFunc,
  108. commandQueue: make([]*Command, 0),
  109. callbackQueue: make([]qmpMonitorCallBack, 0),
  110. jobs: map[string]BlockJob{},
  111. }
  112. // On qmp init must set capabilities
  113. m.commandQueue = append(m.commandQueue, &Command{Execute: "qmp_capabilities"})
  114. m.callbackQueue = append(m.callbackQueue, nil)
  115. return m
  116. }
  117. func (m *QmpMonitor) actionResult(res *Response) string {
  118. if res.ErrorVal != nil {
  119. log.Errorf("Qmp Monitor action result %s: %s", m.server, res.ErrorVal.Error())
  120. return res.ErrorVal.Error()
  121. } else {
  122. return ""
  123. }
  124. }
  125. func (m *QmpMonitor) callBack(res *Response) {
  126. m.mutex.Lock()
  127. if len(m.callbackQueue) == 0 {
  128. m.mutex.Unlock()
  129. return
  130. }
  131. cb := m.callbackQueue[0]
  132. m.callbackQueue = m.callbackQueue[1:]
  133. m.mutex.Unlock()
  134. if cb != nil {
  135. go func() {
  136. defer func() {
  137. if r := recover(); r != nil {
  138. log.Errorf("PANIC %s %s:\n%s", m.server, debug.Stack(), r)
  139. }
  140. }()
  141. cb(res)
  142. }()
  143. }
  144. }
  145. func (m *QmpMonitor) read(r io.Reader) {
  146. if !m.checkReading() {
  147. return
  148. }
  149. scanner := bufio.NewScanner(r)
  150. for scanner.Scan() {
  151. var objmap map[string]*json.RawMessage
  152. b := scanner.Bytes()
  153. if err := json.Unmarshal(b, &objmap); err != nil {
  154. log.Errorf("Unmarshal %s error: %s", m.server, err.Error())
  155. continue
  156. }
  157. if val, ok := objmap["event"]; !ok || !utils.IsInStringArray(string(*val), ignoreEvents) {
  158. log.Infof("QMP Read %s: %s", m.server, string(b))
  159. }
  160. if val, ok := objmap["error"]; ok {
  161. var res = &Response{}
  162. res.ErrorVal = &Error{}
  163. json.Unmarshal(*val, res.ErrorVal)
  164. if id, ok := objmap["id"]; ok {
  165. res.Id = string(*id)
  166. }
  167. m.callBack(res)
  168. } else if val, ok := objmap["return"]; ok {
  169. var res = &Response{}
  170. res.Return = []byte(*val)
  171. if id, ok := objmap["id"]; ok {
  172. res.Id = string(*id)
  173. }
  174. m.callBack(res)
  175. } else if val, ok := objmap["event"]; ok {
  176. var event = &Event{
  177. Event: string(*val),
  178. Data: make(map[string]interface{}, 0),
  179. Timestamp: new(Timestamp),
  180. }
  181. if data, ok := objmap["data"]; ok {
  182. json.Unmarshal(*data, &event.Data)
  183. }
  184. if timestamp, ok := objmap["timestamp"]; ok {
  185. json.Unmarshal(*timestamp, event.Timestamp)
  186. }
  187. m.watchEvent(event)
  188. } else if val, ok := objmap["QMP"]; ok {
  189. // On qmp connected
  190. json.Unmarshal(*val, &objmap)
  191. if val, ok = objmap["version"]; ok {
  192. var version Version
  193. json.Unmarshal(*val, &version)
  194. m.QemuVersion = version.String()
  195. }
  196. // remove reader timeout
  197. m.rwc.SetReadDeadline(time.Time{})
  198. m.connected = true
  199. m.timeout = false
  200. go m.query()
  201. if m.OnMonitorConnected != nil {
  202. go m.OnMonitorConnected()
  203. }
  204. }
  205. }
  206. log.Infof("Scan over %s ...", m.server)
  207. err := scanner.Err()
  208. if err != nil {
  209. log.Infof("QMP Disconnected %s: %s", m.server, err)
  210. }
  211. if m.timeout {
  212. if m.OnMonitorTimeout != nil {
  213. m.OnMonitorTimeout(err)
  214. }
  215. } else if m.connected {
  216. m.connected = false
  217. if m.OnMonitorDisConnect != nil {
  218. m.OnMonitorDisConnect(err)
  219. }
  220. }
  221. m.reading = false
  222. }
  223. func (m *QmpMonitor) watchEvent(event *Event) {
  224. if !utils.IsInStringArray(event.Event, ignoreEvents) {
  225. log.Infof("QMP event %s: %s", m.server, event.String())
  226. }
  227. if m.qmpEventFunc != nil {
  228. go m.qmpEventFunc(event)
  229. }
  230. }
  231. func (m *QmpMonitor) write(cmd []byte) error {
  232. log.Infof("QMP Write %s: %s", m.server, string(cmd))
  233. length, index := len(cmd), 0
  234. for index < length {
  235. i, err := m.rwc.Write(cmd)
  236. if err != nil {
  237. return err
  238. }
  239. index += i
  240. }
  241. return nil
  242. }
  243. func (m *QmpMonitor) query() {
  244. if !m.checkWriting() {
  245. return
  246. }
  247. for {
  248. if len(m.commandQueue) == 0 {
  249. break
  250. }
  251. // pop cmd
  252. m.mutex.Lock()
  253. cmd := m.commandQueue[0]
  254. m.commandQueue = m.commandQueue[1:]
  255. c, _ := json.Marshal(cmd)
  256. err := m.write(c)
  257. m.mutex.Unlock()
  258. if err != nil {
  259. log.Errorf("Write %s to monitor %s error: %s", c, m.server, err)
  260. break
  261. }
  262. }
  263. m.writing = false
  264. }
  265. func (m *QmpMonitor) Query(cmd *Command, cb qmpMonitorCallBack) {
  266. // push cmd
  267. m.mutex.Lock()
  268. m.commandQueue = append(m.commandQueue, cmd)
  269. m.callbackQueue = append(m.callbackQueue, cb)
  270. m.mutex.Unlock()
  271. if m.connected {
  272. if !m.writing {
  273. go m.query()
  274. }
  275. if !m.reading {
  276. go m.read(m.rwc)
  277. }
  278. }
  279. }
  280. func (m *QmpMonitor) ConnectWithSocket(address string, timeout time.Duration) error {
  281. err := m.SBaseMonitor.connect("unix", address)
  282. if err != nil {
  283. return err
  284. }
  285. go m.read(m.rwc)
  286. return nil
  287. }
  288. func (m *QmpMonitor) Connect(host string, port int) error {
  289. err := m.SBaseMonitor.connect("tcp", fmt.Sprintf("%s:%d", host, port))
  290. if err != nil {
  291. return err
  292. }
  293. go m.read(m.rwc)
  294. return nil
  295. }
  296. func (m *QmpMonitor) parseCmd(cmd string) string {
  297. re := regexp.MustCompile(`\s+`)
  298. parts := re.Split(strings.TrimSpace(cmd), -1)
  299. if parts[0] == "info" && len(parts) > 1 {
  300. return "query-" + parts[1]
  301. } else {
  302. return parts[0]
  303. }
  304. }
  305. func (m *QmpMonitor) SimpleCommand(cmd string, callback StringCallback) {
  306. cmd = m.parseCmd(cmd)
  307. var cb func(res *Response)
  308. if callback != nil {
  309. cb = func(res *Response) {
  310. if res.ErrorVal != nil {
  311. callback(res.ErrorVal.Error())
  312. } else {
  313. callback(string(res.Return))
  314. }
  315. }
  316. }
  317. c := &Command{Execute: cmd}
  318. m.Query(c, cb)
  319. }
  320. func (m *QmpMonitor) QemuMonitorCommand(rawCmd string, callback StringCallback) error {
  321. c := Command{}
  322. if err := json.Unmarshal([]byte(rawCmd), &c); err != nil {
  323. return errors.Errorf("unsupport command format: %s", err)
  324. }
  325. cb := func(res *Response) {
  326. log.Debugf("Monitor %s ret: %s", m.server, res.Return)
  327. if res.ErrorVal != nil {
  328. callback(res.ErrorVal.Error())
  329. } else {
  330. callback(strings.Trim(string(res.Return), `""`))
  331. }
  332. }
  333. m.Query(&c, cb)
  334. return nil
  335. }
  336. func (m *QmpMonitor) HumanMonitorCommand(cmd string, callback StringCallback) {
  337. var (
  338. c = &Command{
  339. Execute: "human-monitor-command",
  340. Args: map[string]string{"command-line": cmd},
  341. }
  342. cb = func(res *Response) {
  343. log.Debugf("Monitor %s ret: %s", m.server, res.Return)
  344. if res.ErrorVal != nil {
  345. callback(res.ErrorVal.Error())
  346. } else {
  347. callback(strings.Trim(string(res.Return), `""`))
  348. }
  349. }
  350. )
  351. m.Query(c, cb)
  352. }
  353. func (m *QmpMonitor) QueryStatus(callback StringCallback) {
  354. m.HumanMonitorCommand("info status", m.parseStatus(callback))
  355. }
  356. // func (m *QmpMonitor) parseStatus(callback StringCallback) qmpMonitorCallBack {
  357. // return func(res *Response) {
  358. // if res.ErrorVal != nil {
  359. // callback("unknown")
  360. // return
  361. // }
  362. // var val map[string]interface{}
  363. // err := json.Unmarshal(res.Return, &val)
  364. // if err != nil {
  365. // callback("unknown")
  366. // return
  367. // }
  368. // if status, ok := val["status"]; !ok {
  369. // callback("unknown")
  370. // } else {
  371. // str, _ := status.(string)
  372. // callback(str)
  373. // }
  374. // }
  375. // }
  376. // If get version error, callback with empty string
  377. func (m *QmpMonitor) GetVersion(callback StringCallback) {
  378. cmd := &Command{Execute: "query-version"}
  379. m.Query(cmd, m.parseVersion(callback))
  380. }
  381. func (m *QmpMonitor) parseVersion(callback StringCallback) qmpMonitorCallBack {
  382. return func(res *Response) {
  383. if res.ErrorVal != nil {
  384. callback("")
  385. return
  386. }
  387. var version Version
  388. err := json.Unmarshal(res.Return, &version)
  389. if err != nil {
  390. callback("")
  391. } else {
  392. callback(version.String())
  393. }
  394. }
  395. }
  396. func (m *QmpMonitor) GetBlocks(callback func([]QemuBlock)) {
  397. var cb = func(res *Response) {
  398. if res.ErrorVal != nil {
  399. log.Errorf("GetBlocks error %s", res.ErrorVal)
  400. callback(nil)
  401. return
  402. }
  403. jr, err := jsonutils.Parse(res.Return)
  404. if err != nil {
  405. log.Errorf("Get %s block error %s", m.server, err)
  406. callback(nil)
  407. return
  408. }
  409. blocks := []QemuBlock{}
  410. jr.Unmarshal(&blocks)
  411. callback(blocks)
  412. }
  413. cmd := &Command{Execute: "query-block"}
  414. m.Query(cmd, cb)
  415. }
  416. func (m *QmpMonitor) ChangeCdrom(dev string, path string, callback StringCallback) {
  417. m.HumanMonitorCommand(fmt.Sprintf("change %s %s", dev, path), callback)
  418. }
  419. func (m *QmpMonitor) EjectCdrom(dev string, callback StringCallback) {
  420. m.HumanMonitorCommand(fmt.Sprintf("eject -f %s", dev), callback)
  421. }
  422. func (m *QmpMonitor) DriveDel(idstr string, callback StringCallback) {
  423. m.HumanMonitorCommand(fmt.Sprintf("drive_del %s", idstr), callback)
  424. }
  425. func (m *QmpMonitor) DeviceDel(idstr string, callback StringCallback) {
  426. //m.HumanMonitorCommand(fmt.Sprintf("device_del %s", idstr), callback)
  427. var (
  428. args = map[string]interface{}{
  429. "id": idstr,
  430. }
  431. cmd = &Command{
  432. Execute: "device_del",
  433. Args: args,
  434. }
  435. cb = func(res *Response) {
  436. callback(m.actionResult(res))
  437. }
  438. )
  439. m.Query(cmd, cb)
  440. }
  441. func (m *QmpMonitor) ObjectDel(idstr string, callback StringCallback) {
  442. m.HumanMonitorCommand(fmt.Sprintf("object_del %s", idstr), callback)
  443. }
  444. func (m *QmpMonitor) XBlockdevChange(parent, node, child string, callback StringCallback) {
  445. cb := func(res *Response) {
  446. callback(m.actionResult(res))
  447. }
  448. cmd := &Command{
  449. Execute: "x-blockdev-change",
  450. }
  451. args := map[string]interface{}{
  452. "parent": parent,
  453. }
  454. if len(node) > 0 {
  455. args["node"] = node
  456. }
  457. if len(child) > 0 {
  458. args["child"] = child
  459. }
  460. cmd.Args = args
  461. m.Query(cmd, cb)
  462. }
  463. func (m *QmpMonitor) DriveAdd(bus, node string, params map[string]string, callback StringCallback) {
  464. var paramsKvs = []string{}
  465. for k, v := range params {
  466. paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
  467. }
  468. cmd := "drive_add"
  469. if len(node) > 0 {
  470. cmd = fmt.Sprintf("drive_add -n %s", node)
  471. }
  472. cmd = fmt.Sprintf("%s %s %s", cmd, bus, strings.Join(paramsKvs, ","))
  473. m.HumanMonitorCommand(cmd, callback)
  474. }
  475. func (m *QmpMonitor) DeviceAdd(dev string, params map[string]string, callback StringCallback) {
  476. args := map[string]interface{}{
  477. "driver": dev,
  478. }
  479. for k, v := range params {
  480. args[k] = v
  481. }
  482. cmd := &Command{
  483. Execute: "device_add",
  484. Args: args,
  485. }
  486. cb := func(res *Response) {
  487. callback(m.actionResult(res))
  488. }
  489. m.Query(cmd, cb)
  490. }
  491. func (m *QmpMonitor) MigrateSetDowntime(dtSec float64, callback StringCallback) {
  492. m.HumanMonitorCommand(fmt.Sprintf("migrate_set_downtime %f", dtSec), callback)
  493. }
  494. func (m *QmpMonitor) MigrateSetCapability(capability, state string, callback StringCallback) {
  495. var (
  496. cb = func(res *Response) {
  497. callback(m.actionResult(res))
  498. }
  499. st = false
  500. )
  501. if state == "on" {
  502. st = true
  503. }
  504. cmd := &Command{
  505. Execute: "migrate-set-capabilities",
  506. Args: map[string]interface{}{
  507. "capabilities": []interface{}{
  508. map[string]interface{}{
  509. "capability": capability,
  510. "state": st,
  511. },
  512. },
  513. },
  514. }
  515. m.Query(cmd, cb)
  516. }
  517. func (m *QmpMonitor) MigrateSetParameter(key string, val interface{}, callback StringCallback) {
  518. var (
  519. cb = func(res *Response) {
  520. callback(m.actionResult(res))
  521. }
  522. cmd = &Command{
  523. Execute: "migrate-set-parameters",
  524. Args: map[string]interface{}{
  525. key: val,
  526. },
  527. }
  528. )
  529. m.Query(cmd, cb)
  530. }
  531. func (m *QmpMonitor) MigrateIncoming(address string, callback StringCallback) {
  532. cmd := fmt.Sprintf("migrate_incoming %s", address)
  533. m.HumanMonitorCommand(cmd, callback)
  534. }
  535. func (m *QmpMonitor) MigrateContinue(state string, callback StringCallback) {
  536. cmd := fmt.Sprintf("migrate_continue %s", state)
  537. m.HumanMonitorCommand(cmd, callback)
  538. }
  539. func (m *QmpMonitor) Migrate(
  540. destStr string, copyIncremental, copyFull bool, callback StringCallback,
  541. ) {
  542. var (
  543. cb = func(res *Response) {
  544. callback(m.actionResult(res))
  545. }
  546. cmd = &Command{
  547. Execute: "migrate",
  548. Args: map[string]interface{}{
  549. "uri": destStr,
  550. "blk": copyFull,
  551. "inc": copyIncremental,
  552. },
  553. }
  554. )
  555. m.Query(cmd, cb)
  556. }
  557. func (m *QmpMonitor) GetMigrateStatus(callback StringCallback) {
  558. var (
  559. cmd = &Command{Execute: "query-migrate"}
  560. cb = func(res *Response) {
  561. if res.ErrorVal != nil {
  562. callback(res.ErrorVal.Error())
  563. } else {
  564. /*
  565. {"expected-downtime":300,"ram":{"dirty-pages-rate":0,"dirty-sync-count":1,"duplicate":2966538,"mbps":268.5672,"normal":148629,"normal-bytes":608784384,"page-size":4096,"postcopy-requests":0,"remaining":142815232,"skipped":0,"total":12902539264,"transferred":636674057},"setup-time":65,"status":"active","total-time":20002}
  566. {"disk":{"dirty-pages-rate":0,"dirty-sync-count":0,"duplicate":0,"mbps":0,"normal":0,"normal-bytes":0,"page-size":0,"postcopy-requests":0,"remaining":0,"skipped":0,"total":139586437120,"transferred":139586437120},"expected-downtime":300,"ram":{"dirty-pages-rate":0,"dirty-sync-count":1,"duplicate":193281,"mbps":268.44264,"normal":62311,"normal-bytes":255225856,"page-size":4096,"postcopy-requests":0,"remaining":44474368,"skipped":0,"total":1091379200,"transferred":257555032},"setup-time":15,"status":"active","total-time":10002}
  567. */
  568. ret, err := jsonutils.Parse(res.Return)
  569. if err != nil {
  570. log.Errorf("Parse qmp res error %s: %s", m.server, err)
  571. callback("")
  572. } else {
  573. log.Infof("Query migrate status %s: %s", m.server, ret.String())
  574. status, _ := ret.GetString("status")
  575. if status == "active" {
  576. ramTotal, _ := ret.Int("ram", "total")
  577. ramRemain, _ := ret.Int("ram", "remaining")
  578. ramMbps, _ := ret.Float("ram", "mbps")
  579. diskTotal, _ := ret.Int("disk", "total")
  580. diskRemain, _ := ret.Int("disk", "remaining")
  581. diskMbps, _ := ret.Float("disk", "mbps")
  582. if diskRemain > 0 {
  583. status = "migrate_disk_copy"
  584. } else if ramRemain > 0 {
  585. status = "migrate_ram_copy"
  586. }
  587. mbps := ramMbps + diskMbps
  588. progress := (1 - float64(diskRemain+ramRemain)/float64(diskTotal+ramTotal)) * 100.0
  589. log.Debugf("progress: %f mbps: %f", progress, mbps)
  590. hostutils.UpdateServerProgress(context.Background(), m.sid, progress, mbps)
  591. }
  592. callback(status)
  593. }
  594. }
  595. }
  596. )
  597. m.Query(cmd, cb)
  598. }
  599. func (m *QmpMonitor) GetMigrateStats(callback MigrateStatsCallback) {
  600. var (
  601. cmd = &Command{Execute: "query-migrate"}
  602. cb = func(res *Response) {
  603. if res.ErrorVal != nil {
  604. callback(nil, errors.Errorf("%s", res.ErrorVal.Error()))
  605. } else {
  606. migStats := new(MigrationInfo)
  607. err := json.Unmarshal(res.Return, migStats)
  608. if err != nil {
  609. callback(nil, err)
  610. return
  611. }
  612. callback(migStats, nil)
  613. }
  614. }
  615. )
  616. m.Query(cmd, cb)
  617. }
  618. func (m *QmpMonitor) MigrateCancel(cb StringCallback) {
  619. m.HumanMonitorCommand("migrate_cancel", cb)
  620. }
  621. func (m *QmpMonitor) MigrateStartPostcopy(callback StringCallback) {
  622. var (
  623. cmd = &Command{Execute: "migrate-start-postcopy"}
  624. cb = func(res *Response) {
  625. if res.ErrorVal != nil {
  626. callback(res.ErrorVal.Error())
  627. } else {
  628. ret, err := jsonutils.Parse(res.Return)
  629. if err != nil {
  630. log.Errorf("Parse qmp res error %s: %s", m.server, err)
  631. callback("MigrateStartPostcopy error")
  632. } else {
  633. log.Infof("MigrateStartPostcopy %s: %s", m.server, ret.String())
  634. callback("")
  635. }
  636. }
  637. }
  638. )
  639. m.Query(cmd, cb)
  640. }
  641. func (m *QmpMonitor) blockJobs(res *Response) ([]BlockJob, error) {
  642. if res.ErrorVal != nil {
  643. return nil, errors.Errorf("GetBlockJobs for %s %s", m.server, jsonutils.Marshal(res.ErrorVal).String())
  644. }
  645. ret, err := jsonutils.Parse(res.Return)
  646. if err != nil {
  647. return nil, errors.Wrapf(err, "GetBlockJobs for %s parse %s", m.server, res.Return)
  648. }
  649. log.Debugf("blockJobs response %s", ret)
  650. jobs := []BlockJob{}
  651. if err = ret.Unmarshal(&jobs); err != nil {
  652. return nil, err
  653. }
  654. return jobs, nil
  655. }
  656. func (m *QmpMonitor) GetBlockJobCounts(callback func(jobs int)) {
  657. var cb = func(res *Response) {
  658. jobs, err := m.blockJobs(res)
  659. if err != nil {
  660. callback(-1)
  661. return
  662. }
  663. callback(len(jobs))
  664. }
  665. m.Query(&Command{Execute: "query-block-jobs"}, cb)
  666. }
  667. func (m *QmpMonitor) GetBlockJobs(callback func([]BlockJob)) {
  668. var cb = func(res *Response) {
  669. jobs, _ := m.blockJobs(res)
  670. callback(jobs)
  671. }
  672. m.Query(&Command{Execute: "query-block-jobs"}, cb)
  673. }
  674. func (m *QmpMonitor) ReloadDiskBlkdev(device, path string, callback StringCallback) {
  675. var (
  676. cb = func(res *Response) {
  677. callback(m.actionResult(res))
  678. }
  679. cmd = &Command{
  680. Execute: "reload-disk-snapshot-blkdev-sync",
  681. Args: map[string]string{
  682. "device": device,
  683. "snapshot-file": path,
  684. "mode": "existing",
  685. "format": "qcow2",
  686. },
  687. }
  688. )
  689. m.Query(cmd, cb)
  690. }
  691. func (m *QmpMonitor) DriveMirror(callback StringCallback, drive, target, syncMode, format string, unmap, blockReplication bool, speed int64) {
  692. var (
  693. cb = func(res *Response) {
  694. callback(m.actionResult(res))
  695. }
  696. args = map[string]interface{}{
  697. "device": drive,
  698. "target": target,
  699. "mode": "existing",
  700. "sync": syncMode,
  701. "unmap": unmap,
  702. }
  703. )
  704. if speed > 0 {
  705. args["speed"] = speed
  706. }
  707. if blockReplication {
  708. args["block-replication"] = true
  709. }
  710. cmd := &Command{
  711. Execute: "drive-mirror",
  712. Args: args,
  713. }
  714. m.Query(cmd, cb)
  715. }
  716. func (m *QmpMonitor) DriveBackup(callback StringCallback, drive, target, syncMode, format string) {
  717. var (
  718. cb = func(res *Response) {
  719. callback(m.actionResult(res))
  720. }
  721. args = map[string]interface{}{
  722. "device": drive,
  723. "target": target,
  724. "mode": "existing",
  725. "sync": syncMode,
  726. "format": format,
  727. }
  728. )
  729. cmd := &Command{
  730. Execute: "drive-backup",
  731. Args: args,
  732. }
  733. m.Query(cmd, cb)
  734. }
  735. func (m *QmpMonitor) BlockStream(drive string, callback StringCallback) {
  736. var (
  737. speed = 5 * 100 * 1024 * 1024 // limit 500 MB/s
  738. cb = func(res *Response) {
  739. callback(m.actionResult(res))
  740. }
  741. cmd = &Command{
  742. Execute: "block-stream",
  743. Args: map[string]interface{}{
  744. "device": drive,
  745. "speed": speed,
  746. },
  747. }
  748. )
  749. m.Query(cmd, cb)
  750. }
  751. func (m *QmpMonitor) SetVncPassword(proto, password string, callback StringCallback) {
  752. if len(password) > 8 {
  753. password = password[:8]
  754. }
  755. var (
  756. cb = func(res *Response) {
  757. callback(m.actionResult(res))
  758. }
  759. cmd = &Command{
  760. Execute: "set_password",
  761. Args: map[string]interface{}{
  762. "protocol": proto,
  763. "password": password,
  764. },
  765. }
  766. )
  767. m.Query(cmd, cb)
  768. }
  769. func (m *QmpMonitor) StartNbdServer(port int, exportAllDevice, writable bool, callback StringCallback) {
  770. var cmd = "nbd_server_start"
  771. if exportAllDevice {
  772. cmd += " -a"
  773. }
  774. if writable {
  775. cmd += " -w"
  776. }
  777. cmd += fmt.Sprintf(" 0.0.0.0:%d", port)
  778. m.HumanMonitorCommand(cmd, callback)
  779. }
  780. func (m *QmpMonitor) StopNbdServer(callback StringCallback) {
  781. m.HumanMonitorCommand("nbd_server_stop", callback)
  782. }
  783. func (m *QmpMonitor) ResizeDisk(driveName string, sizeMB int64, callback StringCallback) {
  784. cmd := fmt.Sprintf("block_resize %s %d", driveName, sizeMB)
  785. m.HumanMonitorCommand(cmd, callback)
  786. }
  787. func (m *QmpMonitor) GetCpuCount(callback func(count int)) {
  788. var cb = func(res string) {
  789. cpus := strings.Split(res, "\\n")
  790. count := 0
  791. for _, cpuInfo := range cpus {
  792. if len(strings.TrimSpace(cpuInfo)) > 0 {
  793. count += 1
  794. }
  795. }
  796. callback(count)
  797. }
  798. m.HumanMonitorCommand("info cpus", cb)
  799. }
  800. func (m *QmpMonitor) AddCpu(cpuIndex int, callback StringCallback) {
  801. var (
  802. cb = func(res *Response) {
  803. callback(m.actionResult(res))
  804. }
  805. cmd = &Command{
  806. Execute: "cpu-add",
  807. Args: map[string]interface{}{"id": cpuIndex},
  808. }
  809. )
  810. m.Query(cmd, cb)
  811. }
  812. func (m *QmpMonitor) ObjectAdd(objectType string, params map[string]string, callback StringCallback) {
  813. var paramsKvs = []string{}
  814. for k, v := range params {
  815. paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
  816. }
  817. cmd := fmt.Sprintf("object_add %s,%s", objectType, strings.Join(paramsKvs, ","))
  818. m.HumanMonitorCommand(cmd, callback)
  819. }
  820. func (m *QmpMonitor) GeMemtSlotIndex(callback func(index int)) {
  821. var cb = func(res string) {
  822. memInfos := strings.Split(res, "\\n")
  823. var count int
  824. for _, line := range memInfos {
  825. if strings.HasPrefix(strings.TrimSpace(line), "slot:") {
  826. count += 1
  827. }
  828. }
  829. callback(count)
  830. }
  831. m.HumanMonitorCommand("info memory-devices", cb)
  832. }
  833. func (m *QmpMonitor) GetMemoryDevicesInfo(callback QueryMemoryDevicesCallback) {
  834. var (
  835. cb = func(res *Response) {
  836. if res.ErrorVal != nil {
  837. callback(nil, res.ErrorVal.Error())
  838. } else {
  839. memDevices := make([]MemoryDeviceInfo, 0)
  840. err := json.Unmarshal(res.Return, &memDevices)
  841. if err != nil {
  842. callback(nil, err.Error())
  843. } else {
  844. callback(memDevices, "")
  845. }
  846. }
  847. }
  848. cmd = &Command{
  849. Execute: "query-memory-devices",
  850. }
  851. )
  852. m.Query(cmd, cb)
  853. }
  854. func (m *QmpMonitor) GetMemdevList(callback MemdevListCallback) {
  855. var (
  856. cb = func(res *Response) {
  857. if res.ErrorVal != nil {
  858. callback(nil, res.ErrorVal.Error())
  859. } else {
  860. memdevList := make([]Memdev, 0)
  861. err := json.Unmarshal(res.Return, &memdevList)
  862. if err != nil {
  863. callback(nil, err.Error())
  864. } else {
  865. callback(memdevList, "")
  866. }
  867. }
  868. }
  869. cmd = &Command{
  870. Execute: "query-memdev",
  871. }
  872. )
  873. m.Query(cmd, cb)
  874. }
  875. func (m *QmpMonitor) BlockIoThrottle(driveName string, bps, iops int64, callback StringCallback) {
  876. cmd := fmt.Sprintf("block_set_io_throttle %s %d 0 0 %d 0 0", driveName, bps, iops)
  877. m.HumanMonitorCommand(cmd, callback)
  878. }
  879. func (m *QmpMonitor) CancelBlockJob(driveName string, force bool, callback StringCallback) {
  880. cmd := "block_job_cancel "
  881. if force {
  882. cmd += "-f "
  883. }
  884. cmd += driveName
  885. m.HumanMonitorCommand(cmd, callback)
  886. }
  887. func (m *QmpMonitor) ScreenDump(savePath string, callback StringCallback) {
  888. m.HumanMonitorCommand(fmt.Sprintf("screendump %s", savePath), callback)
  889. }
  890. func (m *QmpMonitor) BlockJobComplete(drive string, callback StringCallback) {
  891. m.HumanMonitorCommand(fmt.Sprintf("block_job_complete %s", drive), callback)
  892. }
  893. func (m *QmpMonitor) BlockReopenImage(drive, newImagePath, format string, callback StringCallback) {
  894. var cb = func(res *Response) {
  895. callback(m.actionResult(res))
  896. }
  897. var cmd = &Command{
  898. Execute: "block_reopen_image",
  899. Args: map[string]interface{}{
  900. "device": drive,
  901. "new_image": newImagePath,
  902. "format": format,
  903. },
  904. }
  905. m.Query(cmd, cb)
  906. }
  907. func (m *QmpMonitor) SnapshotBlkdev(drive, newImagePath, format string, reuse bool, callback StringCallback) {
  908. var cmd = "snapshot_blkdev"
  909. if reuse {
  910. cmd += " -n"
  911. }
  912. cmd += fmt.Sprintf(" %s %s %s", drive, newImagePath, format)
  913. m.HumanMonitorCommand(cmd, callback)
  914. }
  915. func (m *QmpMonitor) NetdevAdd(id, netType string, params map[string]string, callback StringCallback) {
  916. cmd := fmt.Sprintf("netdev_add %s,id=%s", netType, id)
  917. for k, v := range params {
  918. cmd += fmt.Sprintf(",%s=%s", k, v)
  919. }
  920. m.HumanMonitorCommand(cmd, callback)
  921. }
  922. func (m *QmpMonitor) NetdevDel(id string, callback StringCallback) {
  923. cmd := fmt.Sprintf("netdev_del %s", id)
  924. m.HumanMonitorCommand(cmd, callback)
  925. }
  926. func (m *QmpMonitor) SaveState(stateFilePath string, callback StringCallback) {
  927. var (
  928. cb = func(res *Response) {
  929. callback(m.actionResult(res))
  930. }
  931. cmd = &Command{
  932. Execute: "migrate",
  933. Args: map[string]interface{}{
  934. "uri": getSaveStatefileUri(stateFilePath),
  935. },
  936. }
  937. )
  938. m.Query(cmd, cb)
  939. }
  940. func (m *QmpMonitor) QueryPci(callback QueryPciCallback) {
  941. var (
  942. cb = func(res *Response) {
  943. if res.ErrorVal != nil {
  944. callback(nil, res.ErrorVal.Error())
  945. } else {
  946. pciInfoList := make([]PCIInfo, 0)
  947. err := json.Unmarshal(res.Return, &pciInfoList)
  948. if err != nil {
  949. callback(nil, err.Error())
  950. } else {
  951. callback(pciInfoList, "")
  952. }
  953. }
  954. }
  955. cmd = &Command{
  956. Execute: "query-pci",
  957. }
  958. )
  959. m.Query(cmd, cb)
  960. }
  961. func (m *QmpMonitor) QueryMachines(callback QueryMachinesCallback) {
  962. var (
  963. cb = func(res *Response) {
  964. if res.ErrorVal != nil {
  965. callback(nil, res.ErrorVal.Error())
  966. } else {
  967. machineInfoList := make([]MachineInfo, 0)
  968. err := json.Unmarshal(res.Return, &machineInfoList)
  969. if err != nil {
  970. callback(nil, err.Error())
  971. } else {
  972. callback(machineInfoList, "")
  973. }
  974. }
  975. }
  976. cmd = &Command{
  977. Execute: "query-machines",
  978. }
  979. )
  980. m.Query(cmd, cb)
  981. }
  982. func (m *QmpMonitor) Quit(callback StringCallback) {
  983. var (
  984. cb = func(res *Response) {
  985. if res.ErrorVal != nil {
  986. callback(res.ErrorVal.Error())
  987. } else {
  988. callback(string(res.Return))
  989. }
  990. }
  991. cmd = &Command{
  992. Execute: "quit",
  993. }
  994. )
  995. m.Query(cmd, cb)
  996. }
  997. func (m *QmpMonitor) InfoQtree(cb StringCallback) {
  998. m.HumanMonitorCommand("info qtree", cb)
  999. }
  1000. func (m *QmpMonitor) GetHotPluggableCpus(callback HotpluggableCPUListCallback) {
  1001. var (
  1002. cb = func(res *Response) {
  1003. if res.ErrorVal != nil {
  1004. callback(nil, res.ErrorVal.Error())
  1005. } else {
  1006. cpuList := make([]HotpluggableCPU, 0)
  1007. err := json.Unmarshal(res.Return, &cpuList)
  1008. if err != nil {
  1009. callback(nil, err.Error())
  1010. } else {
  1011. callback(cpuList, "")
  1012. }
  1013. }
  1014. }
  1015. cmd = &Command{
  1016. Execute: "query-hotpluggable-cpus",
  1017. }
  1018. )
  1019. m.Query(cmd, cb)
  1020. }