| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package monitor
- import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "regexp"
- "strings"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/onecloud/pkg/util/regutils2"
- )
- type HmpMonitor struct {
- SBaseMonitor
- commandQueue []string
- callbackQueue []StringCallback
- }
- func NewHmpMonitor(server, sid string, OnMonitorDisConnect, OnMonitorTimeout MonitorErrorFunc, OnMonitorConnected MonitorSuccFunc) *HmpMonitor {
- return &HmpMonitor{
- SBaseMonitor: *NewBaseMonitor(server, sid, OnMonitorConnected, OnMonitorDisConnect, OnMonitorTimeout),
- commandQueue: make([]string, 0),
- callbackQueue: make([]StringCallback, 0),
- }
- }
- var hmpMark = []byte("(qemu) ")
- func (m *HmpMonitor) hmpSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
- if atEOF && len(data) == 0 {
- return 0, nil, nil
- }
- index := bytes.Index(data, hmpMark)
- if index >= 0 {
- return index + len(hmpMark), data[0:index], nil
- }
- if atEOF {
- return len(data), data, nil
- }
- // Request more data.
- return 0, nil, nil
- }
- func (m *HmpMonitor) actionResult(res string) string {
- return res
- }
- func (m *HmpMonitor) read(r io.Reader) {
- if !m.checkReading() {
- return
- }
- scanner := bufio.NewScanner(r)
- scanner.Split(m.hmpSplitFunc)
- for scanner.Scan() {
- res := scanner.Text()
- if len(res) == 0 {
- continue
- }
- log.Infof("HMP Read %s: %s", m.server, res)
- if m.connected {
- go m.callBack(res)
- } else {
- // remove reader timeout
- m.connected = true
- m.timeout = false
- m.rwc.SetReadDeadline(time.Time{})
- go m.query()
- go m.OnMonitorConnected()
- }
- }
- log.Infof("Scan over %s ...", m.server)
- err := scanner.Err()
- if err != nil {
- log.Infof("HMP Disconnected %s: %s", m.server, err)
- }
- if m.timeout {
- m.OnMonitorTimeout(err)
- } else if m.connected {
- m.connected = false
- m.OnMonitorDisConnect(err)
- }
- m.reading = false
- }
- func (m *HmpMonitor) callBack(res string) {
- m.mutex.Lock()
- if len(m.callbackQueue) == 0 {
- m.mutex.Unlock()
- return
- }
- cb := m.callbackQueue[0]
- m.callbackQueue = m.callbackQueue[1:]
- m.mutex.Unlock()
- if cb != nil {
- pos := strings.Index(res, "\r\n")
- if pos > 0 {
- res = res[pos+2:]
- }
- go cb(res)
- }
- }
- func (m *HmpMonitor) write(cmd []byte) error {
- cmd = append(cmd, '\n')
- log.Infof("HMP Write %s: %s", m.server, string(cmd))
- length, index := len(cmd), 0
- for index < length {
- i, err := m.rwc.Write(cmd)
- if err != nil {
- return err
- }
- index += i
- }
- return nil
- }
- func (m *HmpMonitor) query() {
- if !m.checkWriting() {
- return
- }
- for {
- if len(m.commandQueue) == 0 {
- break
- }
- //pop
- m.mutex.Lock()
- cmd := m.commandQueue[0]
- m.commandQueue = m.commandQueue[1:]
- err := m.write([]byte(cmd))
- m.mutex.Unlock()
- if err != nil {
- log.Errorf("Write %s to monitor error %s: %s", cmd, m.server, err)
- break
- }
- }
- m.writing = false
- }
- func (m *HmpMonitor) Query(cmd string, cb StringCallback) {
- // push
- m.mutex.Lock()
- m.commandQueue = append(m.commandQueue, cmd)
- m.callbackQueue = append(m.callbackQueue, cb)
- m.mutex.Unlock()
- if m.connected {
- if !m.writing {
- go m.query()
- }
- if !m.reading {
- go m.read(m.rwc)
- }
- }
- }
- func (m *HmpMonitor) ConnectWithSocket(address string) error {
- err := m.SBaseMonitor.connect("unix", address)
- if err != nil {
- return err
- }
- go m.read(m.rwc)
- return nil
- }
- func (m *HmpMonitor) Connect(host string, port int) error {
- err := m.SBaseMonitor.connect("tcp", fmt.Sprintf("%s:%d", host, port))
- if err != nil {
- return err
- }
- go m.read(m.rwc)
- return nil
- }
- func (m *HmpMonitor) QueryStatus(callback StringCallback) {
- m.Query("info status", m.parseStatus(callback))
- }
- func (m *HmpMonitor) SimpleCommand(cmd string, callback StringCallback) {
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) HumanMonitorCommand(cmd string, callback StringCallback) {
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) GetVersion(callback StringCallback) {
- _cb := func(versionStr string) {
- versionStr = strings.TrimSpace(versionStr)
- callback(versionStr)
- }
- m.Query("info version", _cb)
- }
- func (m *HmpMonitor) GetBlocks(callback func([]QemuBlock)) {
- var cb = func(output string) {
- var lines = strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
- var mergedOutput = []string{}
- // merge output
- for _, line := range lines {
- parts := regexp.MustCompile(`\s+`).Split(line, -1)
- if strings.HasSuffix(parts[0], ":") {
- mergedOutput = append(mergedOutput, "")
- } else if regexp.MustCompile(`\(#block\d+\):`).MatchString(line) {
- mergedOutput = append(mergedOutput, "")
- }
- mergedOutput[len(mergedOutput)-1] = mergedOutput[len(mergedOutput)-1] + " " + line
- mergedOutput[len(mergedOutput)-1] = strings.TrimSpace(mergedOutput[len(mergedOutput)-1])
- }
- // parse to json
- ret := []QemuBlock{}
- for _, line := range mergedOutput {
- parts := regexp.MustCompile(`\s+`).Split(line, -1)
- if strings.HasSuffix(parts[0], ":") ||
- regexp.MustCompile(`\(#block\d+\):`).MatchString(parts[1]) {
- block := QemuBlock{}
- if strings.HasSuffix(parts[0], ":") {
- block.Device = parts[0][:len(parts[0])-1]
- } else {
- block.Device = parts[0]
- }
- if regexp.MustCompile(`\(#block\d+\):`).MatchString(parts[1]) {
- block.Inserted.File = parts[2]
- for i := 0; i < len(parts)-2; i++ {
- if parts[i] == "Backing" && parts[i+1] == "file:" {
- block.Inserted.BackingFile = parts[i+2]
- break
- }
- }
- }
- ret = append(ret, block)
- }
- }
- callback(ret)
- }
- m.Query("info block", cb)
- }
- func (m *HmpMonitor) EjectCdrom(dev string, callback StringCallback) {
- m.Query(fmt.Sprintf("eject -f %s", dev), callback)
- }
- func (m *HmpMonitor) ChangeCdrom(dev string, path string, callback StringCallback) {
- m.Query(fmt.Sprintf("change %s %s", dev, path), callback)
- }
- func (m *HmpMonitor) DriveDel(idstr string, callback StringCallback) {
- m.Query(fmt.Sprintf("drive_del %s", idstr), callback)
- }
- func (m *HmpMonitor) DeviceDel(idstr string, callback StringCallback) {
- m.Query(fmt.Sprintf("device_del %s", idstr), callback)
- }
- func (m *HmpMonitor) ObjectDel(idstr string, callback StringCallback) {
- m.Query(fmt.Sprintf("object_del %s", idstr), callback)
- }
- func (m *HmpMonitor) XBlockdevChange(parent, node, child string, callback StringCallback) {
- go callback("hmp not support command x-blockdev-change")
- }
- func (m *HmpMonitor) DriveAdd(bus, node string, params map[string]string, callback StringCallback) {
- var paramsKvs = []string{}
- for k, v := range params {
- paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
- }
- cmd := "drive_add"
- if len(node) > 0 {
- cmd = fmt.Sprintf("drive_add -n %s", node)
- }
- m.Query(fmt.Sprintf("%s %s %s", cmd, bus, strings.Join(paramsKvs, ",")), callback)
- }
- func (m *HmpMonitor) DeviceAdd(dev string, params map[string]string, callback StringCallback) {
- var paramsKvs = []string{}
- for k, v := range params {
- paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
- }
- m.Query(fmt.Sprintf("device_add %s,%s", dev, strings.Join(paramsKvs, ",")), callback)
- }
- func (m *HmpMonitor) MigrateSetDowntime(dtSec float64, callback StringCallback) {
- m.Query(fmt.Sprintf("migrate_set_downtime %f", dtSec), callback)
- }
- func (m *HmpMonitor) MigrateSetCapability(capability, state string, callback StringCallback) {
- m.Query(fmt.Sprintf("migrate_set_capability %s %s", capability, state), callback)
- }
- func (m *HmpMonitor) MigrateSetParameter(key string, val interface{}, callback StringCallback) {
- cmd := fmt.Sprintf("migrate_set_parameter %s %s", key, val)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) MigrateIncoming(address string, callback StringCallback) {
- cmd := fmt.Sprintf("migrate_incoming %s", address)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) MigrateContinue(state string, callback StringCallback) {
- cmd := fmt.Sprintf("migrate_continue %s", state)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) Migrate(
- destStr string, copyIncremental, copyFull bool, callback StringCallback,
- ) {
- cmd := "migrate -d"
- if copyIncremental {
- cmd += " -i"
- } else if copyFull {
- cmd += " -b"
- }
- cmd += " " + destStr
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) GetMigrateStatus(callback StringCallback) {
- cb := func(output string) {
- log.Infof("Query migrate status %s: %s", m.server, output)
- var status string
- for _, line := range strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n") {
- if strings.HasPrefix(line, "Migration status") {
- status = line[strings.LastIndex(line, " ")+1:]
- break
- }
- }
- callback(status)
- }
- m.Query("info migrate", cb)
- }
- func (m *HmpMonitor) GetMigrateStats(callback MigrateStatsCallback) {
- go callback(nil, errors.Errorf("unsupport get migrate stats"))
- }
- func (m *HmpMonitor) MigrateCancel(cb StringCallback) {
- m.Query("migrate_cancel", cb)
- }
- func (m *HmpMonitor) MigrateStartPostcopy(callback StringCallback) {
- cb := func(output string) {
- log.Infof("MigrateStartPostcopy %s: %s", m.server, output)
- callback(output)
- }
- m.Query("migrate_start_postcopy", cb)
- }
- func (m *HmpMonitor) GetBlockJobCounts(callback func(jobs int)) {
- cb := func(output string) {
- lines := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
- if lines[0] == "No active jobs" {
- callback(0)
- } else {
- callback(len(lines))
- }
- }
- m.Query("info block-jobs", cb)
- }
- func (m *HmpMonitor) GetBlockJobs(callback func([]BlockJob)) {
- cb := func(output string) {
- lines := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
- if lines[0] == "No active jobs" {
- callback(nil)
- return
- }
- jobs := []BlockJob{}
- re := regexp.MustCompile(`Type (?P<type>\w+), device (?P<device>\w+)`)
- for i := 0; i < len(lines); i++ {
- m := regutils2.GetParams(re, lines[i])
- if len(m) > 0 {
- job := BlockJob{}
- job.Type, _ = m["type"]
- job.Device, _ = m["device"]
- jobs = append(jobs, job)
- }
- }
- callback(jobs)
- }
- m.Query("info block-jobs", cb)
- }
- func (m *HmpMonitor) ReloadDiskBlkdev(device, path string, callback StringCallback) {
- m.Query(fmt.Sprintf("reload_disk_snapshot_blkdev -n %s %s", device, path), callback)
- }
- func (m *HmpMonitor) DriveMirror(callback StringCallback, drive, target, syncMode, format string, unmap, blockReplication bool, speed int64) {
- cmd := "drive_mirror -n"
- if blockReplication {
- cmd += " -c"
- }
- if syncMode == "full" {
- cmd += " -f"
- }
- cmd += fmt.Sprintf(" %s %s %s", drive, target, format)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) DriveBackup(callback StringCallback, drive, target, syncMode, format string) {
- cmd := "drive_backup -n"
- if syncMode == "full" {
- cmd += " -f"
- }
- cmd += fmt.Sprintf(" %s %s %s", drive, target, format)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) BlockStream(drive string, callback StringCallback) {
- var (
- speed = 500 // limit 500 MB/s
- cmd = fmt.Sprintf("block_stream %s %d", drive, speed)
- )
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) BlockJobComplete(drive string, callback StringCallback) {
- m.Query("block_job_complete", callback)
- }
- func (m *HmpMonitor) BlockReopenImage(drive, newImagePath, format string, cb StringCallback) {
- m.Query(fmt.Sprintf("block_reopen_image %s %s %s", drive, newImagePath, format), cb)
- }
- func (m *HmpMonitor) SnapshotBlkdev(drive, newImagePath, format string, reuse bool, cb StringCallback) {
- var cmd = "snapshot_blkdev"
- if reuse {
- cmd += " -n"
- }
- cmd += fmt.Sprintf(" %s %s %s", drive, newImagePath, format)
- m.Query(cmd, cb)
- }
- func (m *HmpMonitor) SetVncPassword(proto, password string, callback StringCallback) {
- if len(password) > 8 {
- password = password[:8]
- }
- m.Query(fmt.Sprintf("set_password %s %s", proto, password), callback)
- }
- func (m *HmpMonitor) StartNbdServer(port int, exportAllDevice, writable bool, callback StringCallback) {
- var cmd = "nbd_server_start"
- if exportAllDevice {
- cmd += " -a"
- }
- if writable {
- cmd += " -w"
- }
- cmd += fmt.Sprintf(" 0.0.0.0:%d", port)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) StopNbdServer(callback StringCallback) {
- m.Query("nbd_server_stop", callback)
- }
- func (m *HmpMonitor) ResizeDisk(driveName string, sizeMB int64, callback StringCallback) {
- cmd := fmt.Sprintf("block_resize %s %d", driveName, sizeMB)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) GetCpuCount(callback func(count int)) {
- var cb = func(output string) {
- cpus := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
- callback(len(cpus))
- }
- m.Query("info cpus", cb)
- }
- func (m *HmpMonitor) AddCpu(cpuIndex int, callback StringCallback) {
- m.Query(fmt.Sprintf("cpu-add %d", cpuIndex), callback)
- }
- func (m *HmpMonitor) GeMemtSlotIndex(callback func(index int)) {
- var cb = func(output string) {
- memInfos := strings.Split(strings.TrimSuffix(output, "\r\n"), "\r\n")
- var count int
- for _, line := range memInfos {
- if strings.HasPrefix(line, "slot:") {
- count += 1
- }
- }
- callback(count)
- }
- m.Query("info memory-devices", cb)
- }
- func (m *HmpMonitor) GetMemoryDevicesInfo(cb QueryMemoryDevicesCallback) {
- go cb(nil, "hmp unsupport get memory devices info")
- }
- func (m *HmpMonitor) GetMemdevList(callback MemdevListCallback) {
- go callback(nil, "hmp unsupport get memdev list")
- }
- func (m *HmpMonitor) ObjectAdd(objectType string, params map[string]string, callback StringCallback) {
- var paramsKvs = []string{}
- for k, v := range params {
- paramsKvs = append(paramsKvs, fmt.Sprintf("%s=%s", k, v))
- }
- cmd := fmt.Sprintf("object_add %s,%s", objectType, strings.Join(paramsKvs, ","))
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) BlockIoThrottle(driveName string, bps, iops int64, callback StringCallback) {
- cmd := fmt.Sprintf("block_set_io_throttle %s %d 0 0 %d 0 0", driveName, bps, iops)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) CancelBlockJob(driveName string, force bool, callback StringCallback) {
- cmd := "block_job_cancel "
- if force {
- cmd += "-f "
- }
- cmd += driveName
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) NetdevAdd(id, netType string, params map[string]string, callback StringCallback) {
- cmd := fmt.Sprintf("netdev_add %s,id=%s", netType, id)
- for k, v := range params {
- cmd += fmt.Sprintf(",%s=%s", k, v)
- }
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) NetdevDel(id string, callback StringCallback) {
- cmd := fmt.Sprintf("netdev_del %s", id)
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) SaveState(stateFilePath string, callback StringCallback) {
- cmd := fmt.Sprintf(`migrate -d "%s"`, getSaveStatefileUri(stateFilePath))
- m.Query(cmd, callback)
- }
- func (m *HmpMonitor) QueryPci(callback QueryPciCallback) {
- go callback(nil, "unsupported query pci for hmp")
- }
- func (m *HmpMonitor) QueryMachines(callback QueryMachinesCallback) {
- go callback(nil, "unsupported query machines for hmp")
- }
- func (m *HmpMonitor) ScreenDump(savePath string, callback StringCallback) {
- m.HumanMonitorCommand(fmt.Sprintf("screendump %s", savePath), callback)
- }
- func (m *HmpMonitor) Quit(cb StringCallback) {
- m.Query("quit", cb)
- }
- func (m *HmpMonitor) InfoQtree(cb StringCallback) {
- m.Query("info qtree", cb)
- }
- func (m *HmpMonitor) GetHotPluggableCpus(callback HotpluggableCPUListCallback) {
- go callback(nil, "unsupported get hotpluggable cpu list for hmp")
- }
|