upload.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package azure
  15. import (
  16. "bytes"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "math"
  21. "net/http"
  22. "net/url"
  23. "time"
  24. "yunion.io/x/cloudmux/pkg/multicloud/azure/vhdcore/block/bitmap"
  25. "yunion.io/x/cloudmux/pkg/multicloud/azure/vhdcore/common"
  26. "yunion.io/x/cloudmux/pkg/multicloud/azure/vhdcore/diskstream"
  27. "yunion.io/x/cloudmux/pkg/multicloud/azure/vhdcore/footer"
  28. "yunion.io/x/cloudmux/pkg/multicloud/azure/vhdcore/validator"
  29. "yunion.io/x/cloudmux/pkg/multicloud/azure/concurrent"
  30. "yunion.io/x/cloudmux/pkg/multicloud/azure/progress"
  31. )
  32. // DiskUploadContext type describes VHD upload context, this includes the disk stream to read from, the ranges of
  33. // the stream to read, the destination blob and it's container, the client to communicate with Azure storage and
  34. // the number of parallel go-routines to use for upload.
  35. //
  36. type DataWithRange struct {
  37. Range *common.IndexRange
  38. Data []byte
  39. }
  40. type DiskUploadContext struct {
  41. StorageAccount *SStorageAccount // The storage account to use for authentication
  42. AccessKey string // The access key to use for authentication
  43. VhdStream *diskstream.DiskStream // The stream whose ranges needs to be uploaded
  44. AlreadyProcessedBytes int64 // The size in bytes already uploaded
  45. UploadableRanges []*common.IndexRange // The subset of stream ranges to be uploaded
  46. ContainerName string // The container in which page blob resides
  47. BlobName string // The destination page blob name
  48. Parallelism int // The number of concurrent goroutines to be used for upload
  49. Resume bool // Indicate whether this is a new or resuming upload
  50. MD5Hash []byte // MD5Hash to be set in the page blob properties once upload finishes
  51. }
  52. // oneMB is one MegaByte
  53. const oneMB = float64(1048576)
  54. // Upload uploads the disk ranges described by the parameter cxt, this parameter describes the disk stream to
  55. // read from, the ranges of the stream to read, the destination blob and it's container, the client to communicate
  56. // with Azure storage and the number of parallel go-routines to use for upload.
  57. func Upload(cxt *DiskUploadContext, callback func(float32)) error {
  58. // Get the channel that contains stream of disk data to upload
  59. dataWithRangeChan, streamReadErrChan := GetDataWithRanges(cxt.VhdStream, cxt.UploadableRanges)
  60. // The channel to send upload request to load-balancer
  61. requtestChan := make(chan *concurrent.Request, 0)
  62. // Prepare and start the load-balancer that load request across 'cxt.Parallelism' workers
  63. loadBalancer := concurrent.NewBalancer(cxt.Parallelism)
  64. loadBalancer.Init()
  65. workerErrorChan, allWorkersFinishedChan := loadBalancer.Run(requtestChan)
  66. // Calculate the actual size of the data to upload
  67. uploadSizeInBytes := int64(0)
  68. for _, r := range cxt.UploadableRanges {
  69. uploadSizeInBytes += r.Length()
  70. }
  71. fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(cxt.VhdStream.GetSize())/oneMB)
  72. // Prepare and start the upload progress tracker
  73. uploadProgress := progress.NewStatus(cxt.Parallelism, cxt.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
  74. progressChan := uploadProgress.Run()
  75. // read progress status from progress tracker and print it
  76. go readAndPrintProgress(progressChan, cxt.Resume, callback)
  77. // listen for errors reported by workers and print it
  78. var allWorkSucceeded = true
  79. go func() {
  80. for {
  81. fmt.Println(<-workerErrorChan)
  82. allWorkSucceeded = false
  83. }
  84. }()
  85. var err error
  86. L:
  87. for {
  88. select {
  89. case dataWithRange, ok := <-dataWithRangeChan:
  90. if !ok {
  91. close(requtestChan)
  92. break L
  93. }
  94. // Create work request
  95. client := cxt.StorageAccount.region.client
  96. req := &concurrent.Request{
  97. Work: func() error {
  98. params := url.Values{}
  99. params.Set("comp", "page")
  100. header := http.Header{}
  101. header.Set("x-ms-blob-type", "PageBlob")
  102. header.Set("x-ms-page-write", "update")
  103. header.Set("x-ms-range", fmt.Sprintf("bytes=%d-%d", dataWithRange.Range.Start, dataWithRange.Range.End))
  104. header.Set("Content-Length", fmt.Sprintf("%v", dataWithRange.Range.Length()))
  105. file := fmt.Sprintf("%s/%s", cxt.ContainerName, cxt.BlobName)
  106. err = client.put_storage_v2(cxt.AccessKey, cxt.StorageAccount.Name, file, header, params, bytes.NewReader(dataWithRange.Data), nil)
  107. if err != nil {
  108. return errors.New(err.Error())
  109. }
  110. uploadProgress.ReportBytesProcessedCount(dataWithRange.Range.Length())
  111. return nil
  112. },
  113. ShouldRetry: func(e error) bool {
  114. return true
  115. },
  116. ID: dataWithRange.Range.String(),
  117. }
  118. // Send work request to load balancer for processing
  119. //
  120. requtestChan <- req
  121. case err = <-streamReadErrChan:
  122. close(requtestChan)
  123. loadBalancer.TearDownWorkers()
  124. break L
  125. }
  126. }
  127. <-allWorkersFinishedChan
  128. uploadProgress.Close()
  129. if !allWorkSucceeded {
  130. err = errors.New("\nUpload Incomplete: Some blocks of the VHD failed to upload, rerun the command to upload those blocks")
  131. }
  132. if err == nil {
  133. fmt.Printf("\r Completed: %3d%% [%10.2f MB] RemainingTime: %02dh:%02dm:%02ds Throughput: %d Mb/sec %2c ",
  134. 100,
  135. float64(uploadSizeInBytes)/oneMB,
  136. 0, 0, 0,
  137. 0, ' ')
  138. }
  139. return err
  140. }
  141. // GetDataWithRanges with start reading and streaming the ranges from the disk identified by the parameter ranges.
  142. // It returns two channels, a data channel to stream the disk ranges and a channel to send any error while reading
  143. // the disk. On successful completion the data channel will be closed. the caller must not expect any more value in
  144. // the data channel if the error channel is signaled.
  145. func GetDataWithRanges(stream *diskstream.DiskStream, ranges []*common.IndexRange) (<-chan *DataWithRange, <-chan error) {
  146. dataWithRangeChan := make(chan *DataWithRange, 0)
  147. errorChan := make(chan error, 0)
  148. go func() {
  149. for _, r := range ranges {
  150. dataWithRange := &DataWithRange{
  151. Range: r,
  152. Data: make([]byte, r.Length()),
  153. }
  154. _, err := stream.Seek(r.Start, 0)
  155. if err != nil {
  156. errorChan <- err
  157. return
  158. }
  159. _, err = io.ReadFull(stream, dataWithRange.Data)
  160. if err != nil {
  161. errorChan <- err
  162. return
  163. }
  164. dataWithRangeChan <- dataWithRange
  165. }
  166. close(dataWithRangeChan)
  167. }()
  168. return dataWithRangeChan, errorChan
  169. }
  170. // readAndPrintProgress reads the progress records from the given progress channel and output it. It reads the
  171. // progress record until the channel is closed.
  172. func readAndPrintProgress(progressChan <-chan *progress.Record, resume bool, callback func(float32)) {
  173. var spinChars = [4]rune{'\\', '|', '/', '-'}
  174. s := time.Time{}
  175. if resume {
  176. fmt.Println("\nResuming VHD upload..")
  177. } else {
  178. fmt.Println("\nUploading the VHD..")
  179. }
  180. i := 0
  181. for progressRecord := range progressChan {
  182. if i == 4 {
  183. i = 0
  184. }
  185. t := s.Add(progressRecord.RemainingDuration)
  186. fmt.Printf("\r Completed: %3d%% [%10.2f MB] RemainingTime: %02dh:%02dm:%02ds Throughput: %d Mb/sec %2c ",
  187. int(progressRecord.PercentComplete),
  188. float64(progressRecord.BytesProcessed)/oneMB,
  189. t.Hour(), t.Minute(), t.Second(),
  190. int(progressRecord.AverageThroughputMbPerSecond),
  191. spinChars[i],
  192. )
  193. if callback != nil {
  194. callback(33.0 + float32(progressRecord.PercentComplete*0.33))
  195. }
  196. i++
  197. }
  198. }
  199. func ensureVHDSanity(localVHDPath string) error {
  200. if err := validator.ValidateVhd(localVHDPath); err != nil {
  201. return err
  202. }
  203. if err := validator.ValidateVhdSize(localVHDPath); err != nil {
  204. return err
  205. }
  206. return nil
  207. }
  208. func LocateUploadableRanges(stream *diskstream.DiskStream, rangesToSkip []*common.IndexRange, pageSizeInBytes int64) ([]*common.IndexRange, error) {
  209. var err error
  210. var diskRanges = make([]*common.IndexRange, 0)
  211. stream.EnumerateExtents(func(ext *diskstream.StreamExtent, extErr error) bool {
  212. if extErr != nil {
  213. err = extErr
  214. return false
  215. }
  216. diskRanges = append(diskRanges, ext.Range)
  217. return true
  218. })
  219. if err != nil {
  220. return nil, err
  221. }
  222. diskRanges = common.SubtractRanges(diskRanges, rangesToSkip)
  223. diskRanges = common.ChunkRangesBySize(diskRanges, pageSizeInBytes)
  224. return diskRanges, nil
  225. }
  226. func DetectEmptyRanges(diskStream *diskstream.DiskStream, uploadableRanges []*common.IndexRange) ([]*common.IndexRange, error) {
  227. if diskStream.GetDiskType() != footer.DiskTypeFixed {
  228. return uploadableRanges, nil
  229. }
  230. fmt.Println("\nDetecting empty ranges..")
  231. totalRangesCount := len(uploadableRanges)
  232. lastIndex := int32(-1)
  233. emptyRangesCount := int32(0)
  234. bits := make([]byte, int32(math.Ceil(float64(totalRangesCount)/float64(8))))
  235. bmap := bitmap.NewBitMapFromByteSliceCopy(bits)
  236. indexChan, errChan := LocateNonEmptyRangeIndices(diskStream, uploadableRanges)
  237. L:
  238. for {
  239. select {
  240. case index, ok := <-indexChan:
  241. if !ok {
  242. break L
  243. }
  244. bmap.Set(index, true)
  245. emptyRangesCount += index - lastIndex - 1
  246. lastIndex = index
  247. fmt.Printf("\r Empty ranges : %d/%d", emptyRangesCount, totalRangesCount)
  248. case err := <-errChan:
  249. return nil, err
  250. }
  251. }
  252. // Remove empty ranges from the uploadable ranges slice.
  253. i := int32(0)
  254. for j := 0; j < totalRangesCount; j++ {
  255. if set, _ := bmap.Get(int32(j)); set {
  256. uploadableRanges[i] = uploadableRanges[j]
  257. i++
  258. }
  259. }
  260. uploadableRanges = uploadableRanges[:i]
  261. return uploadableRanges, nil
  262. }
  263. func LocateNonEmptyRangeIndices(stream *diskstream.DiskStream, ranges []*common.IndexRange) (<-chan int32, <-chan error) {
  264. indexChan := make(chan int32, 0)
  265. errorChan := make(chan error, 0)
  266. go func() {
  267. count := int64(-1)
  268. var buf []byte
  269. for index, r := range ranges {
  270. if count != r.Length() {
  271. count = r.Length()
  272. buf = make([]byte, count)
  273. }
  274. _, err := stream.Seek(r.Start, 0)
  275. if err != nil {
  276. errorChan <- err
  277. return
  278. }
  279. _, err = io.ReadFull(stream, buf)
  280. if err != nil {
  281. errorChan <- err
  282. return
  283. }
  284. if !isAllZero(buf) {
  285. indexChan <- int32(index)
  286. }
  287. }
  288. close(indexChan)
  289. }()
  290. return indexChan, errorChan
  291. }
  292. // isAllZero returns true if the given byte slice contain all zeros
  293. func isAllZero(buf []byte) bool {
  294. l := len(buf)
  295. j := 0
  296. for ; j < l; j++ {
  297. if buf[j] != byte(0) {
  298. break
  299. }
  300. }
  301. return j == l
  302. }