| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package storageman
- import (
- "context"
- "encoding/json"
- "fmt"
- "io/fs"
- "os"
- "path"
- "sync"
- "syscall"
- "time"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/onecloud/pkg/apis"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- imageapi "yunion.io/x/onecloud/pkg/apis/image"
- "yunion.io/x/onecloud/pkg/hostman/hostutils"
- "yunion.io/x/onecloud/pkg/hostman/storageman/remotefile"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/util/fileutils2"
- "yunion.io/x/onecloud/pkg/util/procutils"
- "yunion.io/x/onecloud/pkg/util/qemuimg"
- )
- const (
- _TMP_SUFFIX_ = ".tmp"
- _INF_SUFFIX_ = ".inf"
- CHECK_TIMEOUT = 3600 * time.Second
- )
- type SLocalImageCache struct {
- imageId string
- Manager IImageCacheManger
- Size int64
- Desc *remotefile.SImageDesc
- consumerCount int
- cond *sync.Cond
- lastCheckTime time.Time
- remoteFile *remotefile.SRemoteFile
- accessDirLock sync.Mutex
- }
- func NewLocalImageCache(imageId string, imagecacheManager IImageCacheManger) *SLocalImageCache {
- imageCache := new(SLocalImageCache)
- imageCache.imageId = imageId
- imageCache.Manager = imagecacheManager
- imageCache.cond = sync.NewCond(new(sync.Mutex))
- imageCache.accessDirLock = sync.Mutex{}
- return imageCache
- }
- func (l *SLocalImageCache) GetDesc() *remotefile.SImageDesc {
- return l.Desc
- }
- func (l *SLocalImageCache) GetImageId() string {
- return l.imageId
- }
- func (l *SLocalImageCache) GetName() string {
- if l.Desc != nil && len(l.Desc.Name) > 0 {
- return l.Desc.Name
- }
- return l.imageId
- }
- func (l *SLocalImageCache) Load() error {
- var (
- imgPath = l.GetPath()
- infPath = l.GetInfPath()
- desc = &remotefile.SImageDesc{}
- )
- if fileutils2.Exists(imgPath) {
- needReload := false
- if fileutils2.Exists(infPath) {
- sdesc, err := fileutils2.FileGetContents(infPath)
- if err != nil {
- return errors.Wrapf(err, "fileutils2.FileGetContents(%s)", infPath)
- }
- err = json.Unmarshal([]byte(sdesc), desc)
- if err != nil {
- return errors.Wrapf(err, "jsonutils.Unmarshal(%s)", infPath)
- }
- fi := l.getFileInfo()
- if fi != nil && fi.Size()/1024/1024 != desc.SizeMb {
- // fix file size
- needReload = true
- }
- } else {
- needReload = true
- }
- if needReload {
- img, err := qemuimg.NewQemuImage(imgPath)
- if err != nil {
- return errors.Wrapf(err, "NewQemuImage(%s)", imgPath)
- }
- if !img.IsValid() {
- return fmt.Errorf("invalid local image %s", img.String())
- }
- chksum, err := fileutils2.MD5(imgPath)
- if err != nil {
- return errors.Wrapf(err, "fileutils2.MD5(%s)", imgPath)
- }
- desc = &remotefile.SImageDesc{
- Format: string(img.Format),
- Id: l.imageId,
- Chksum: chksum,
- Path: imgPath,
- }
- fi := l.getFileInfo()
- if fi != nil {
- desc.SizeMb = fi.Size() / 1024 / 1024
- if fi.Sys() != nil {
- atime := fi.Sys().(*syscall.Stat_t).Atim
- desc.AccessAt = time.Unix(atime.Sec, atime.Nsec)
- }
- }
- err = fileutils2.FilePutContents(infPath, jsonutils.Marshal(desc).PrettyString(), false)
- if err != nil {
- return errors.Wrapf(err, "fileutils2.FilePutContents(%s)", infPath)
- }
- }
- if len(desc.Chksum) > 0 && len(desc.Id) > 0 && desc.Id == l.imageId {
- l.Desc = desc
- return nil
- } else {
- log.Errorf("image %s desc not correct, desc.Chksum %s desc.Id %s l.imageId %s", imgPath, desc.Chksum, desc.Id, l.imageId)
- }
- }
- tmpPath := l.GetTmpPath()
- if fileutils2.Exists(tmpPath) {
- syscall.Unlink(tmpPath)
- }
- return errors.Wrap(cloudprovider.ErrNotFound, imgPath)
- }
- func (l *SLocalImageCache) needCheck() bool {
- if time.Now().Sub(l.lastCheckTime) > CHECK_TIMEOUT {
- return true
- }
- return false
- }
- func (l *SLocalImageCache) Release() {
- l.cond.L.Lock()
- defer l.cond.L.Unlock()
- l.consumerCount -= 1
- }
- func (l *SLocalImageCache) Acquire(ctx context.Context, input api.CacheImageInput, callback func(progress, progressMbps float64, totalSizeMb int64)) error {
- isOk, err := l.prepare(ctx, input)
- if err != nil {
- return errors.Wrapf(err, "prepare")
- }
- if isOk {
- return nil
- }
- return l.fetch(ctx, input, callback)
- }
- func (l *SLocalImageCache) prepare(ctx context.Context, input api.CacheImageInput) (bool, error) {
- l.cond.L.Lock()
- defer l.cond.L.Unlock()
- for l.remoteFile != nil {
- l.cond.Wait()
- }
- if l.remoteFile == nil && l.Desc != nil && (l.consumerCount > 0 || !l.needCheck()) {
- l.consumerCount++
- return true, nil
- }
- url, err := auth.GetServiceURL(apis.SERVICE_TYPE_IMAGE, "", input.Zone, "", httputils.GET)
- if err != nil {
- return false, errors.Wrapf(err, "GetServiceURL(%s)", apis.SERVICE_TYPE_IMAGE)
- }
- url += fmt.Sprintf("/images/%s", l.imageId)
- if len(input.Format) == 0 {
- input.Format = "qcow2"
- }
- url += fmt.Sprintf("?format=%s&scope=system", input.Format)
- l.remoteFile = remotefile.NewRemoteFile(ctx, url,
- l.GetPath(), false, input.Checksum, -1, nil, l.GetTmpPath(), input.SrcUrl)
- return false, nil
- }
- func (l *SLocalImageCache) fetch(ctx context.Context, input api.CacheImageInput, callback func(progress, progressMbps float64, totalSizeMb int64)) error {
- // Whether successful or not, fetch should reset the condition variable and wakes up other waiters
- defer func() {
- l.cond.L.Lock()
- l.remoteFile = nil
- l.cond.Broadcast()
- l.cond.L.Unlock()
- }()
- var _fetch = func() error {
- if len(l.Manager.GetId()) > 0 {
- // TGZ images are unpacked after fetch; do not set status to active here.
- // TODO: status should be updated by the region task; this logic may be removable?
- if l.remoteFile.GetFormat() != imageapi.IMAGE_DISK_FORMAT_TGZ {
- _, err := hostutils.RemoteStoragecacheCacheImage(ctx,
- l.Manager.GetId(), l.imageId, "active", l.GetPath())
- if err != nil {
- log.Errorf("Fail to update host cached image: %s", err)
- }
- }
- }
- l.cond.L.Lock()
- defer l.cond.L.Unlock()
- var err error
- l.Desc, err = l.remoteFile.GetInfo()
- if err != nil {
- return errors.Wrapf(err, "remoteFile.GetInfo")
- }
- fi := l.getFileInfo()
- if fi != nil {
- l.Size = fi.Size() / 1024 / 1024
- }
- l.Desc.Id = l.imageId
- l.lastCheckTime = time.Now()
- l.consumerCount++
- bDesc, err := json.Marshal(l.Desc)
- if err != nil {
- return errors.Wrapf(err, "json.Marshal(%#v)", l.Desc)
- }
- err = fileutils2.FilePutContents(l.GetInfPath(), string(bDesc), false)
- if err != nil {
- return errors.Wrapf(err, "FilePutContents(%s)", string(bDesc))
- }
- return nil
- }
- if fileutils2.Exists(l.GetPath()) {
- if input.SkipChecksumIfExists {
- if err := l.remoteFile.FillAttributes(callback); err != nil {
- return errors.Wrap(err, "fetch remote attribute")
- }
- return _fetch()
- } else {
- if err := l.remoteFile.VerifyIntegrity(callback); err == nil {
- return _fetch()
- } else {
- log.Warningf("Verify remotefile checksum error: %v, starting fetching it", err)
- }
- }
- }
- err := l.remoteFile.Fetch(callback)
- if err != nil {
- return errors.Wrapf(err, "remoteFile.Fetch")
- }
- return _fetch()
- }
- func (l *SLocalImageCache) Remove(ctx context.Context) error {
- if fileutils2.Exists(l.GetPath()) {
- if err := syscall.Unlink(l.GetPath()); err != nil {
- return errors.Wrap(err, l.GetPath())
- }
- }
- if fileutils2.Exists(l.GetInfPath()) {
- if err := syscall.Unlink(l.GetInfPath()); err != nil {
- return errors.Wrap(err, l.GetInfPath())
- }
- }
- if fileutils2.Exists(l.GetTmpPath()) {
- if err := syscall.Unlink(l.GetTmpPath()); err != nil {
- return errors.Wrap(err, l.GetTmpPath())
- }
- }
- if fileutils2.Exists(l.getAccessDirPath()) {
- if err := procutils.NewCommand("/bin/rm", "-fr", l.getAccessDirPath()).Run(); err != nil {
- return errors.Wrapf(err, "remove directory %s", l.getAccessDirPath())
- }
- }
- go func() {
- _, err := modules.Storagecachedimages.Detach(hostutils.GetComputeSession(ctx),
- l.Manager.GetId(), l.imageId, nil)
- if err != nil && httputils.ErrorCode(err) != 404 {
- log.Errorf("Fail to delete host cached image %s at %s: %s", l.imageId, l.Manager.GetId(), err)
- }
- }()
- return nil
- }
- func (l *SLocalImageCache) GetPath() string {
- return path.Join(l.Manager.GetPath(), l.imageId)
- }
- func (l *SLocalImageCache) GetTmpPath() string {
- return l.GetPath() + _TMP_SUFFIX_
- }
- func (l *SLocalImageCache) GetInfPath() string {
- return l.GetPath() + _INF_SUFFIX_
- }
- func (l *SLocalImageCache) getFileInfo() fs.FileInfo {
- if fi, err := os.Stat(l.GetPath()); err != nil {
- log.Errorln(err)
- return nil
- } else {
- return fi
- }
- }
- func (l *SLocalImageCache) getAccessDirPath() string {
- return fmt.Sprintf("%s-dir", l.GetPath())
- }
- func (l *SLocalImageCache) GetAccessDirectory() (string, error) {
- if l.Desc.Format != imageapi.IMAGE_DISK_FORMAT_TGZ {
- return "", errors.Wrapf(errors.ErrNotSupported, "format %s", l.Desc.Format)
- }
- l.accessDirLock.Lock()
- defer l.accessDirLock.Unlock()
- dir := l.getAccessDirPath()
- if fileutils2.Exists(dir) {
- return dir, nil
- }
- tmpDir := fmt.Sprintf("%s-tmp", dir)
- // untar cached image
- out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", tmpDir).Output()
- if err != nil {
- return "", errors.Wrapf(err, "mkdir %s: %s", tmpDir, out)
- }
- out, err = procutils.NewRemoteCommandAsFarAsPossible("tar", "xf", l.GetPath(), "-C", tmpDir).Output()
- if err != nil {
- return "", errors.Wrapf(err, "untar to %s: %s", tmpDir, out)
- }
- out, err = procutils.NewRemoteCommandAsFarAsPossible("mv", tmpDir, dir).Output()
- if err != nil {
- return "", errors.Wrapf(err, "mv %s %s: %s", tmpDir, dir, out)
- }
- return dir, nil
- }
|