fs.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "net/http"
  21. "os"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "syscall"
  27. "bazil.org/fuse/fs"
  28. "github.com/pierrec/lz4/v4"
  29. "github.com/pkg/errors"
  30. "yunion.io/x/log"
  31. "yunion.io/x/pkg/util/httputils"
  32. "yunion.io/x/onecloud/pkg/util/bitmap"
  33. )
  34. // FetcherFs implements the feteher file system.
  35. type FetcherFs struct {
  36. blocksize int64
  37. url string
  38. size int64
  39. receivedSize int64
  40. blockCount int64
  41. fetchedCount int64
  42. blockBitMap *bitmap.BitMap
  43. readLock sync.Mutex
  44. localFile *os.File
  45. }
  46. func (FetcherFs) Root() (fs.Node, error) {
  47. return Dir{}, nil
  48. }
  49. var fetcherFs *FetcherFs
  50. func initFetcherFs() (*FetcherFs, error) {
  51. fetcherFs = &FetcherFs{
  52. blocksize: int64(opt.Blocksize) * 1024 * 1024,
  53. url: opt.Url,
  54. }
  55. if err := fetcherFs.fetchMetaInfo(); err != nil {
  56. return nil, errors.Wrapf(err, "fetch meta info")
  57. }
  58. segs := strings.Split(opt.Url, "/")
  59. if len(segs[len(segs)-1]) == 0 {
  60. return nil, errors.Errorf("bad url: %s", opt.Url)
  61. }
  62. fd, err := ioutil.TempFile(opt.Tmpdir, fmt.Sprintf("%s.*", segs[len(segs)-1]))
  63. if err != nil {
  64. return nil, errors.Wrap(err, "create tempfile")
  65. }
  66. err = syscall.Fallocate(int(fd.Fd()), 0, 0, fetcherFs.size)
  67. if err != nil {
  68. os.Remove(path.Join(opt.Tmpdir, fd.Name()))
  69. return nil, errors.Wrap(err, "fallocate temp file")
  70. }
  71. fetcherFs.localFile = fd
  72. return fetcherFs, nil
  73. }
  74. func destoryInitFetcherFs() error {
  75. if fetcherFs == nil {
  76. return nil
  77. }
  78. log.Errorf("Remove path %s", fetcherFs.localFile.Name())
  79. err := os.Remove(fetcherFs.localFile.Name())
  80. if err != nil {
  81. return err
  82. }
  83. return nil
  84. }
  85. func (fs *FetcherFs) fetchMetaInfo() error {
  86. header := NewRequestHeader()
  87. res, _, err := httputils.JSONRequest(
  88. httputils.GetDefaultClient(),
  89. context.Background(),
  90. http.MethodHead,
  91. opt.Url,
  92. header,
  93. nil,
  94. false,
  95. )
  96. if err != nil {
  97. return err
  98. }
  99. if lengthStr := res.Get("Content-Length"); lengthStr != "" {
  100. fs.size, err = strconv.ParseInt(lengthStr, 10, 0)
  101. if err != nil {
  102. return errors.Wrap(err, "parse Content-Length")
  103. }
  104. fs.blockCount = fs.size / fs.blocksize
  105. if fs.size%fs.blocksize > 0 {
  106. fs.blockCount += 1
  107. }
  108. fs.blockBitMap = bitmap.NewBitMap(fs.blockCount)
  109. } else {
  110. return errors.Errorf("not found Content-Length")
  111. }
  112. return nil
  113. }
  114. func (fs *FetcherFs) GetSize() int64 {
  115. return fs.size
  116. }
  117. func (fs *FetcherFs) GetBlockCount() int64 {
  118. return fs.blockCount
  119. }
  120. func (fs *FetcherFs) GetFetchedCount() int64 {
  121. return fs.fetchedCount
  122. }
  123. func (fs *FetcherFs) GetReceivedSize() int64 {
  124. return fs.receivedSize
  125. }
  126. func (fs *FetcherFs) GetMeta() string {
  127. return fmt.Sprintf(
  128. "pid=%d\nblock_count=%d\nfetch_count=%d\nrecvsize=%d\ntoken=%s\n",
  129. os.Getpid(), fs.GetBlockCount(), fs.GetFetchedCount(),
  130. fs.GetReceivedSize(), opt.Token,
  131. )
  132. }
  133. func (fs *FetcherFs) doRead(size int, offset int64) ([]byte, error) {
  134. fs.readLock.Lock()
  135. defer fs.readLock.Unlock()
  136. var (
  137. start, end = fs.offsetToBlockIndexRange(size, offset)
  138. err error
  139. )
  140. for idx := start; idx <= end; idx++ {
  141. err = fs.fetchData(idx)
  142. if err != nil {
  143. return nil, errors.Wrap(err, "fetch data")
  144. }
  145. }
  146. _, err = fs.localFile.Seek(offset, io.SeekStart)
  147. if err != nil {
  148. return nil, errors.Wrap(err, "seek local file")
  149. }
  150. var buf = make([]byte, size)
  151. n, err := fs.localFile.Read(buf)
  152. if err != nil {
  153. return nil, errors.Wrap(err, "do read")
  154. }
  155. return buf[:n], nil
  156. }
  157. func (fs *FetcherFs) fetchData(idx int64) (err error) {
  158. for i := 0; i < 3; i++ {
  159. if err = fs.doFetchData(idx); err != nil {
  160. log.Errorf("fetch data %d failed: %s", idx, err)
  161. } else {
  162. break
  163. }
  164. }
  165. return
  166. }
  167. func (fs *FetcherFs) blockReady(idx int64) bool {
  168. return fs.blockBitMap.Has(idx)
  169. }
  170. func (fs *FetcherFs) setBlockReady(idx int64) {
  171. fs.blockBitMap.Set(idx)
  172. }
  173. func (fs *FetcherFs) doFetchData(idx int64) error {
  174. if fs.blockReady(idx) {
  175. return nil
  176. }
  177. log.Infof("start do fetch idx %d ", idx)
  178. var start = idx * fs.blocksize
  179. var end = start + fs.blocksize - 1
  180. if end >= fs.size {
  181. end = fs.size - 1
  182. }
  183. var header = NewRequestHeader()
  184. header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
  185. log.Errorf("Range %s", header.Get("Range"))
  186. var (
  187. res, err = httputils.Request(
  188. httputils.GetDefaultClient(),
  189. context.Background(),
  190. http.MethodGet,
  191. fs.url, header,
  192. nil, false,
  193. )
  194. )
  195. if err != nil {
  196. return errors.Wrap(err, "http fetch data")
  197. }
  198. defer res.Body.Close()
  199. if res.StatusCode >= 300 {
  200. body, _ := ioutil.ReadAll(res.Body)
  201. return errors.Errorf("failed fetch data: %d %s", res.StatusCode, body)
  202. }
  203. var lz4Reader = lz4.NewReader(res.Body)
  204. if err := lz4Reader.Apply(lz4.ConcurrencyOption(-1)); err != nil {
  205. return errors.Errorf("Apply lz4 option: %v", err)
  206. }
  207. buf, err := ioutil.ReadAll(lz4Reader)
  208. if len(buf) != int(end-start+1) {
  209. return errors.Wrap(err, "written local file")
  210. }
  211. written, err := fs.localFile.WriteAt(buf, start)
  212. if err != nil {
  213. return errors.Wrap(err, "write local file")
  214. }
  215. if err := fs.localFile.Sync(); err != nil {
  216. log.Errorf("sync failed %s", err)
  217. }
  218. // on write success
  219. fs.setBlockReady(idx)
  220. fs.receivedSize += int64(written)
  221. fs.fetchedCount += 1
  222. return nil
  223. }
  224. // fetcherfs byte range to block range
  225. func (fs *FetcherFs) offsetToBlockIndexRange(size int, offset int64) (int64, int64) {
  226. return offset / int64(fs.blocksize), (offset + int64(size) - 1) / fs.blocksize
  227. }
  228. func (fs *FetcherFs) destory() error {
  229. var header = NewRequestHeader()
  230. _, err := httputils.Request(
  231. httputils.GetDefaultClient(),
  232. context.Background(),
  233. http.MethodPost,
  234. fs.url, header,
  235. nil, false,
  236. )
  237. return err
  238. }
  239. func NewRequestHeader() http.Header {
  240. header := http.Header{}
  241. header.Set("X-Auth-Token", opt.Token)
  242. if len(opt.EncryptKey) > 0 {
  243. header.Set("X-Encrypt-Key", opt.EncryptKey)
  244. }
  245. if len(opt.EncryptAlg) > 0 {
  246. header.Set("X-Encrypt-Alg", opt.EncryptAlg)
  247. }
  248. return header
  249. }