downloader.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package downloader
  15. import (
  16. "compress/zlib"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "os"
  21. "path/filepath"
  22. "time"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/onecloud/pkg/util/pb"
  26. "yunion.io/x/onecloud/pkg/util/sparsefile"
  27. )
  28. const (
  29. CHUNK_SIZE = 1024 * 8
  30. DEFAULT_RATE_LIMIT = 50
  31. COMPRESS_LEVEL = 1
  32. )
  33. type SDownloadProvider struct {
  34. w http.ResponseWriter
  35. rateLimit int
  36. compress bool
  37. sparse bool
  38. }
  39. func NewDownloadProvider(w http.ResponseWriter, compress, sparse bool, rateLimit int) *SDownloadProvider {
  40. if rateLimit <= 0 {
  41. rateLimit = DEFAULT_RATE_LIMIT
  42. }
  43. return &SDownloadProvider{w: w, rateLimit: rateLimit, compress: compress, sparse: sparse}
  44. }
  45. func (d *SDownloadProvider) Start(
  46. prepareDownload func() error, onDownloadComplete func(),
  47. downloadFilePath string, headers http.Header,
  48. ) error {
  49. if prepareDownload != nil {
  50. if err := prepareDownload(); err != nil {
  51. log.Errorln(err)
  52. return err
  53. }
  54. }
  55. if headers.Get("Content-Type") == "" {
  56. headers.Set("Content-Type", "application/octet-stream")
  57. }
  58. for k := range headers {
  59. d.w.Header().Add(k, headers.Get(k))
  60. }
  61. log.Infof("Downloader Start Transfer %s, compress %t sparse %t rateLimit: %dMiB/s", downloadFilePath, d.compress, d.sparse, d.rateLimit)
  62. spath, err := filepath.EvalSymlinks(downloadFilePath)
  63. if err == nil {
  64. downloadFilePath = spath
  65. }
  66. fi, err := os.Open(downloadFilePath)
  67. if err != nil {
  68. return errors.Wrapf(err, "os.Open(%s)", downloadFilePath)
  69. }
  70. defer fi.Close()
  71. stat, err := fi.Stat()
  72. if err != nil {
  73. return errors.Wrapf(err, "fi.Stat")
  74. }
  75. var reader io.Reader
  76. reader = fi
  77. size := stat.Size()
  78. d.w.Header().Set("X-File-Size", fmt.Sprintf("%d", size))
  79. if d.sparse {
  80. sparse, err := sparsefile.NewSparseFileReader(fi)
  81. if err != nil {
  82. return errors.Wrapf(err, "NewSparseFileReader")
  83. }
  84. size = sparse.Size()
  85. d.w.Header().Set("X-Sparse-Header", fmt.Sprintf("%d", sparse.HeaderSize()))
  86. reader = sparse
  87. }
  88. d.w.Header().Set("Content-Length", fmt.Sprintf("%d", size))
  89. var writer io.Writer = d.w
  90. if d.compress {
  91. zw, err := zlib.NewWriterLevel(d.w, COMPRESS_LEVEL)
  92. if err != nil {
  93. log.Errorln(err)
  94. return err
  95. }
  96. writer = zw
  97. defer zw.Close()
  98. defer zw.Flush() // it's cool
  99. }
  100. pb := pb.NewProxyReader(reader, size)
  101. pb.SetRateLimit(d.rateLimit)
  102. pb.SetRefreshRate(time.Second * 10)
  103. pb.SetCallback(func() {
  104. log.Infof("transfer %s rate: %.2f MiB p/s percent: %.2f%%", downloadFilePath, pb.Rate(), pb.Percent())
  105. })
  106. _, err = io.Copy(writer, pb)
  107. if err != nil {
  108. return errors.Wrapf(err, "io.Copy")
  109. }
  110. if onDownloadComplete != nil {
  111. onDownloadComplete()
  112. }
  113. return nil
  114. }