monitor.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package monitor
  15. import (
  16. "fmt"
  17. "net"
  18. "strings"
  19. "sync"
  20. "time"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. )
  24. type StringCallback func(string)
  25. type BlockJob struct {
  26. server string
  27. Busy bool
  28. // commit|
  29. Type string
  30. Len int64
  31. Paused bool
  32. Ready bool
  33. // running|ready
  34. Status string
  35. // ok|
  36. IoStatus string `json:"io-status"`
  37. Offset int64
  38. Device string
  39. Speed int64
  40. Start time.Time
  41. PreOffset int64
  42. Now time.Time
  43. SpeedMbps float64
  44. }
  45. type QemuBlock struct {
  46. IoStatus string `json:"io-status"`
  47. Device string
  48. Locked bool
  49. Removable bool
  50. Qdev string
  51. TrayOpen bool
  52. Type string
  53. Inserted struct {
  54. Ro bool
  55. Drv string
  56. Encrypted bool
  57. File string
  58. BackingFile string
  59. BackingFileDepth int
  60. Bps int64
  61. BpsRd int64
  62. BpsWr int
  63. Iops int64
  64. IopsRd int
  65. IopsWr int
  66. BpsMax int64
  67. BpsRdMax int64
  68. BpsWrMax int64
  69. IopsMax int
  70. IopsRdMax int
  71. IopsWrMax int
  72. IopsSize int64
  73. DetectZeroes string
  74. WriteThreshold int
  75. Image struct {
  76. Filename string
  77. Format string
  78. VirtualSize int64 `json:"virtual-size"`
  79. BackingFile string
  80. FullBackingFilename string `json:"full-backing-filename"`
  81. BackingFilenameFormat string `json:"backing-filename-format"`
  82. Snapshots []struct {
  83. Id string
  84. Name string
  85. VmStateSize int `json:"vm-state-size"`
  86. DateSec int64 `json:"date-sec"`
  87. DateNsec int `json:"date-nsec"`
  88. VmClockSec int `json:"vm-clock-sec"`
  89. VmClockNsec int `json:"vm-clock-nsec"`
  90. }
  91. BackingImage struct {
  92. filename string
  93. format string
  94. VirtualSize int64 `json:"virtual-size"`
  95. } `json:"backing-image"`
  96. }
  97. }
  98. }
  99. type MigrationInfo struct {
  100. Status *MigrationStatus `json:"status,omitempty"`
  101. RAM *MigrationStats `json:"ram,omitempty"`
  102. Disk *MigrationStats `json:"disk,omitempty"`
  103. XbzrleCache *XbzrleCacheStats `json:"xbzrle-cache,omitempty"`
  104. TotalTime *int64 `json:"total-time,omitempty"`
  105. ExpectedDowntime *int64 `json:"expected-downtime,omitempty"`
  106. Downtime *int64 `json:"downtime,omitempty"`
  107. SetupTime *int64 `json:"setup-time,omitempty"`
  108. CPUThrottlePercentage *int64 `json:"cpu-throttle-percentage,omitempty"`
  109. ErrorDesc *string `json:"error-desc,omitempty"`
  110. }
  111. type MigrationStats struct {
  112. Transferred int64 `json:"transferred"`
  113. Remaining int64 `json:"remaining"`
  114. Total int64 `json:"total"`
  115. Duplicate int64 `json:"duplicate"`
  116. Skipped int64 `json:"skipped"`
  117. Normal int64 `json:"normal"`
  118. NormalBytes int64 `json:"normal-bytes"`
  119. DirtyPagesRate int64 `json:"dirty-pages-rate"`
  120. Mbps float64 `json:"mbps"`
  121. DirtySyncCount int64 `json:"dirty-sync-count"`
  122. PostcopyRequests int64 `json:"postcopy-requests"`
  123. PageSize int64 `json:"page-size"`
  124. }
  125. // XbzrleCacheStats implements the "XBZRLECacheStats" QMP API type.
  126. type XbzrleCacheStats struct {
  127. CacheSize int64 `json:"cache-size"`
  128. Bytes int64 `json:"bytes"`
  129. Pages int64 `json:"pages"`
  130. CacheMiss int64 `json:"cache-miss"`
  131. CacheMissRate float64 `json:"cache-miss-rate"`
  132. Overflow int64 `json:"overflow"`
  133. }
  134. // MigrationStatus implements the "MigrationStatus" QMP API type.
  135. type MigrationStatus string
  136. type MigrateStatsCallback func(*MigrationInfo, error)
  137. type blockSizeByte int64
  138. func (self blockSizeByte) String() string {
  139. size := map[string]float64{
  140. "Kb": 1024,
  141. "Mb": 1024 * 1024,
  142. "Gb": 1024 * 1024 * 1024,
  143. "TB": 1024 * 1024 * 1024 * 1024,
  144. }
  145. for _, unit := range []string{"TB", "Gb", "Mb", "Kb"} {
  146. if int64(self)/int64(size[unit]) > 0 {
  147. return fmt.Sprintf("%.2f%s", float64(self)/size[unit], unit)
  148. }
  149. }
  150. return fmt.Sprintf("%d", int64(self))
  151. }
  152. func (self *BlockJob) CalcOffset(preOffset int64) {
  153. if self.Start.IsZero() {
  154. self.Start = time.Now()
  155. self.Now = time.Now()
  156. self.PreOffset = preOffset
  157. return
  158. }
  159. second := time.Now().Sub(self.Now).Seconds()
  160. if second > 0 {
  161. speed := float64(self.Offset-preOffset) / second
  162. self.SpeedMbps = speed / 1024 / 1024
  163. avgSpeed := float64(self.Offset) / time.Now().Sub(self.Start).Seconds()
  164. log.Infof(`[%s / %s] server %s block job for %s speed: %s/s(avg: %s/s)`, blockSizeByte(self.Offset).String(), blockSizeByte(self.Len).String(), self.server, self.Device, blockSizeByte(speed).String(), blockSizeByte(avgSpeed).String())
  165. }
  166. self.PreOffset = preOffset
  167. self.Now = time.Now()
  168. return
  169. }
  170. type Monitor interface {
  171. Connect(host string, port int) error
  172. ConnectWithSocket(address string, timeout time.Duration) error
  173. Disconnect()
  174. IsConnected() bool
  175. // The callback function will be called in another goroutine
  176. SimpleCommand(cmd string, callback StringCallback)
  177. HumanMonitorCommand(cmd string, callback StringCallback)
  178. QemuMonitorCommand(cmd string, callback StringCallback) error
  179. QueryStatus(StringCallback)
  180. GetVersion(StringCallback)
  181. GetBlockJobCounts(func(jobs int))
  182. GetBlockJobs(func([]BlockJob))
  183. QueryPci(callback QueryPciCallback)
  184. InfoQtree(cb StringCallback)
  185. GetCpuCount(func(count int))
  186. AddCpu(cpuIndex int, callback StringCallback)
  187. GetHotPluggableCpus(HotpluggableCPUListCallback)
  188. GeMemtSlotIndex(func(index int))
  189. GetMemoryDevicesInfo(QueryMemoryDevicesCallback)
  190. GetMemdevList(MemdevListCallback)
  191. GetBlocks(callback func([]QemuBlock))
  192. EjectCdrom(dev string, callback StringCallback)
  193. ChangeCdrom(dev string, path string, callback StringCallback)
  194. DriveDel(idstr string, callback StringCallback)
  195. DeviceDel(idstr string, callback StringCallback)
  196. ObjectDel(idstr string, callback StringCallback)
  197. ObjectAdd(objectType string, params map[string]string, callback StringCallback)
  198. DriveAdd(bus, node string, params map[string]string, callback StringCallback)
  199. DeviceAdd(dev string, params map[string]string, callback StringCallback)
  200. XBlockdevChange(parent, node, child string, callback StringCallback)
  201. BlockStream(drive string, callback StringCallback)
  202. DriveMirror(callback StringCallback, drive, target, syncMode, format string, unmap, blockReplication bool, speed int64)
  203. DriveBackup(callback StringCallback, drive, target, syncMode, format string)
  204. BlockJobComplete(drive string, cb StringCallback)
  205. BlockReopenImage(drive, newImagePath, format string, cb StringCallback)
  206. SnapshotBlkdev(drive, newImagePath, format string, reuse bool, cb StringCallback)
  207. MigrateSetDowntime(dtSec float64, callback StringCallback)
  208. MigrateSetCapability(capability, state string, callback StringCallback)
  209. MigrateSetParameter(key string, val interface{}, callback StringCallback)
  210. MigrateIncoming(address string, callback StringCallback)
  211. Migrate(destStr string, copyIncremental, copyFull bool, callback StringCallback)
  212. MigrateContinue(state string, callback StringCallback)
  213. GetMigrateStatus(callback StringCallback)
  214. MigrateStartPostcopy(callback StringCallback)
  215. GetMigrateStats(callback MigrateStatsCallback)
  216. MigrateCancel(cb StringCallback)
  217. ReloadDiskBlkdev(device, path string, callback StringCallback)
  218. SetVncPassword(proto, password string, callback StringCallback)
  219. StartNbdServer(port int, exportAllDevice, writable bool, callback StringCallback)
  220. StopNbdServer(callback StringCallback)
  221. ResizeDisk(driveName string, sizeMB int64, callback StringCallback)
  222. BlockIoThrottle(driveName string, bps, iops int64, callback StringCallback)
  223. CancelBlockJob(driveName string, force bool, callback StringCallback)
  224. NetdevAdd(id, netType string, params map[string]string, callback StringCallback)
  225. NetdevDel(id string, callback StringCallback)
  226. ScreenDump(savePath string, callback StringCallback)
  227. SaveState(statFilePath string, callback StringCallback)
  228. QueryMachines(callback QueryMachinesCallback)
  229. Quit(StringCallback)
  230. }
  231. type MonitorErrorFunc func(error)
  232. type MonitorSuccFunc func()
  233. type SBaseMonitor struct {
  234. OnMonitorDisConnect MonitorErrorFunc
  235. OnMonitorConnected MonitorSuccFunc
  236. OnMonitorTimeout MonitorErrorFunc
  237. server string
  238. sid string
  239. QemuVersion string
  240. connected bool
  241. timeout bool
  242. rwc net.Conn
  243. mutex *sync.Mutex
  244. writing bool
  245. reading bool
  246. }
  247. func NewBaseMonitor(server, sid string, OnMonitorConnected MonitorSuccFunc, OnMonitorDisConnect, OnMonitorTimeout MonitorErrorFunc) *SBaseMonitor {
  248. return &SBaseMonitor{
  249. OnMonitorConnected: OnMonitorConnected,
  250. OnMonitorDisConnect: OnMonitorDisConnect,
  251. OnMonitorTimeout: OnMonitorTimeout,
  252. server: server,
  253. sid: sid,
  254. timeout: true,
  255. mutex: &sync.Mutex{},
  256. }
  257. }
  258. func (m *SBaseMonitor) connect(protocol, address string) error {
  259. conn, err := net.Dial(protocol, address)
  260. if err != nil {
  261. return errors.Errorf("Connect to %s %s failed %s", protocol, address, err)
  262. }
  263. log.Infof("Connect %s %s success", protocol, address)
  264. m.onConnectSuccess(conn)
  265. return nil
  266. }
  267. func (m *SBaseMonitor) onConnectSuccess(conn net.Conn) {
  268. // Setup reader timeout
  269. conn.SetReadDeadline(time.Now().Add(90 * time.Second))
  270. // set rwc hand
  271. m.rwc = conn
  272. }
  273. func (m *SBaseMonitor) SetReadDeadlineTimeout(duration time.Duration) {
  274. if m.rwc != nil {
  275. m.rwc.SetReadDeadline(time.Now().Add(duration))
  276. }
  277. }
  278. func (m *SBaseMonitor) Connect(host string, port int) error {
  279. return m.connect("tcp", fmt.Sprintf("%s:%d", host, port))
  280. }
  281. func (m *SBaseMonitor) ConnectWithSocket(address string) error {
  282. return m.connect("unix", address)
  283. }
  284. func (m *SBaseMonitor) Disconnect() {
  285. if m.connected {
  286. m.connected = false
  287. m.rwc.Close()
  288. }
  289. }
  290. func (m *SBaseMonitor) IsConnected() bool {
  291. return m.connected
  292. }
  293. func (m *SBaseMonitor) QemuMonitorCommand(cmd string, callback StringCallback) error {
  294. return errors.ErrNotSupported
  295. }
  296. func (m *SBaseMonitor) checkReading() bool {
  297. m.mutex.Lock()
  298. defer m.mutex.Unlock()
  299. if m.reading {
  300. return false
  301. } else {
  302. m.reading = true
  303. }
  304. return true
  305. }
  306. func (m *SBaseMonitor) checkWriting() bool {
  307. m.mutex.Lock()
  308. defer m.mutex.Unlock()
  309. if m.writing {
  310. return false
  311. } else {
  312. m.writing = true
  313. }
  314. return true
  315. }
  316. func (m *SBaseMonitor) parseStatus(callback StringCallback) StringCallback {
  317. return func(output string) {
  318. strs := strings.Split(output, "\r\n")
  319. for _, str := range strs {
  320. if strings.HasPrefix(str, "VM status:") {
  321. callback(strings.TrimSpace(
  322. strings.Trim(str[len("VM status:"):], "\\r\\n"),
  323. ))
  324. return
  325. }
  326. }
  327. }
  328. }
  329. func getSaveStatefileUri(stateFilePath string) string {
  330. if strings.HasSuffix(stateFilePath, ".gz") {
  331. return fmt.Sprintf("exec:gzip -c > %s", stateFilePath)
  332. }
  333. return fmt.Sprintf("exec:cat > %s", stateFilePath)
  334. }