| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017 |
- package cos
- import (
- "context"
- "crypto/md5"
- "encoding/xml"
- "errors"
- "fmt"
- "hash/crc64"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "sort"
- "strconv"
- "strings"
- "time"
- )
- // ObjectService 相关 API
- type ObjectService service
- // ObjectGetOptions is the option of GetObject
- type ObjectGetOptions struct {
- ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
- ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
- ResponseExpires string `url:"response-expires,omitempty" header:"-"`
- ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
- ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
- ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
- Range string `url:"-" header:"Range,omitempty"`
- IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
- // SSE-C
- XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
- //兼容其他自定义头部
- XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
- XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
- // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
- Listener ProgressListener `header:"-" url:"-" xml:"-"`
- }
- // presignedURLTestingOptions is the opt of presigned url
- type presignedURLTestingOptions struct {
- authTime *AuthTime
- }
- // Get Object 请求可以将一个文件(Object)下载至本地。
- // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
- //
- // https://www.qcloud.com/document/product/436/7753
- func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
- var u string
- if len(id) == 1 {
- u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
- } else if len(id) == 0 {
- u = "/" + encodeURIComponent(name)
- } else {
- return nil, errors.New("wrong params")
- }
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodGet,
- optQuery: opt,
- optHeader: opt,
- disableCloseBody: true,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- if opt != nil && opt.Listener != nil {
- if err == nil && resp != nil {
- if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
- resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
- }
- }
- }
- return resp, err
- }
- // GetToFile download the object to local file
- func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
- resp, err := s.Get(ctx, name, opt, id...)
- if err != nil {
- return resp, err
- }
- defer resp.Body.Close()
- // If file exist, overwrite it
- fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
- if err != nil {
- return resp, err
- }
- _, err = io.Copy(fd, resp.Body)
- fd.Close()
- if err != nil {
- return resp, err
- }
- return resp, nil
- }
- // GetPresignedURL get the object presigned to down or upload file by url
- func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: "/" + encodeURIComponent(name),
- method: httpMethod,
- optQuery: opt,
- optHeader: opt,
- }
- req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
- if err != nil {
- return nil, err
- }
- var authTime *AuthTime
- if opt != nil {
- if opt, ok := opt.(*presignedURLTestingOptions); ok {
- authTime = opt.authTime
- }
- }
- if authTime == nil {
- authTime = NewAuthTime(expired)
- }
- authorization := newAuthorization(ak, sk, req, authTime)
- sign := encodeURIComponent(authorization, []byte{'&', '='})
- if req.URL.RawQuery == "" {
- req.URL.RawQuery = fmt.Sprintf("%s", sign)
- } else {
- req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
- }
- return req.URL, nil
- }
- // ObjectPutHeaderOptions the options of header of the put object
- type ObjectPutHeaderOptions struct {
- CacheControl string `header:"Cache-Control,omitempty" url:"-"`
- ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
- ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
- ContentType string `header:"Content-Type,omitempty" url:"-"`
- ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
- ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
- ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
- Expect string `header:"Expect,omitempty" url:"-"`
- Expires string `header:"Expires,omitempty" url:"-"`
- XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
- // 自定义的 x-cos-meta-* header
- XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
- XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
- // 可选值: Normal, Appendable
- //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
- // Enable Server Side Encryption, Only supported: AES256
- XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
- // SSE-C
- XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
- //兼容其他自定义头部
- XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
- XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
- // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
- Listener ProgressListener `header:"-" url:"-" xml:"-"`
- }
- // ObjectPutOptions the options of put object
- type ObjectPutOptions struct {
- *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
- *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
- }
- // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
- //
- // https://www.qcloud.com/document/product/436/7749
- func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*Response, error) {
- if err := CheckReaderLen(r); err != nil {
- return nil, err
- }
- opt := cloneObjectPutOptions(uopt)
- totalBytes, err := GetReaderLen(r)
- if err != nil && opt != nil && opt.Listener != nil {
- return nil, err
- }
- if err == nil {
- // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传
- if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
- opt.ContentLength = totalBytes
- }
- }
- reader := TeeReader(r, nil, totalBytes, nil)
- if s.client.Conf.EnableCRC {
- reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
- }
- if opt != nil && opt.Listener != nil {
- reader.listener = opt.Listener
- }
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: "/" + encodeURIComponent(name),
- method: http.MethodPut,
- body: reader,
- optHeader: opt,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- return resp, err
- }
- // PutFromFile put object from local file
- // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
- func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (resp *Response, err error) {
- nr := 0
- for nr < 3 {
- fd, e := os.Open(filePath)
- if e != nil {
- err = e
- return
- }
- resp, err = s.Put(ctx, name, fd, opt)
- if err != nil {
- nr++
- fd.Close()
- continue
- }
- fd.Close()
- break
- }
- return
- }
- // ObjectCopyHeaderOptions is the head option of the Copy
- type ObjectCopyHeaderOptions struct {
- // When use replace directive to update meta infos
- CacheControl string `header:"Cache-Control,omitempty" url:"-"`
- ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
- ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
- ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
- ContentType string `header:"Content-Type,omitempty" url:"-"`
- Expires string `header:"Expires,omitempty" url:"-"`
- Expect string `header:"Expect,omitempty" url:"-"`
- XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
- XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
- XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
- XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
- XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
- XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
- // 自定义的 x-cos-meta-* header
- XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
- XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
- XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
- // SSE-C
- XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
- XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
- XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
- XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
- //兼容其他自定义头部
- XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
- }
- // ObjectCopyOptions is the option of Copy, choose header or body
- type ObjectCopyOptions struct {
- *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
- *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
- }
- // ObjectCopyResult is the result of Copy
- type ObjectCopyResult struct {
- XMLName xml.Name `xml:"CopyObjectResult"`
- ETag string `xml:"ETag,omitempty"`
- LastModified string `xml:"LastModified,omitempty"`
- CRC64 string `xml:"CRC64,omitempty"`
- VersionId string `xml:"VersionId,omitempty"`
- }
- // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
- // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
- //
- // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
- //
- // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
- //
- // https://cloud.tencent.com/document/product/436/10881
- func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
- surl := strings.SplitN(sourceURL, "/", 2)
- if len(surl) < 2 {
- return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
- }
- var u string
- if len(id) == 1 {
- u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
- } else if len(id) == 0 {
- u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
- } else {
- return nil, nil, errors.New("wrong params")
- }
- var res ObjectCopyResult
- copyOpt := &ObjectCopyOptions{
- &ObjectCopyHeaderOptions{},
- &ACLHeaderOptions{},
- }
- if opt != nil {
- if opt.ObjectCopyHeaderOptions != nil {
- *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
- }
- if opt.ACLHeaderOptions != nil {
- *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
- }
- }
- copyOpt.XCosCopySource = u
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: "/" + encodeURIComponent(name),
- method: http.MethodPut,
- body: nil,
- optHeader: copyOpt,
- result: &res,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
- if err == nil && resp.StatusCode == 200 {
- if res.ETag == "" {
- return &res, resp, errors.New("response 200 OK, but body contains an error")
- }
- }
- return &res, resp, err
- }
- type ObjectDeleteOptions struct {
- // SSE-C
- XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
- //兼容其他自定义头部
- XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
- VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
- }
- // Delete Object请求可以将一个文件(Object)删除。
- //
- // https://www.qcloud.com/document/product/436/7743
- func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
- var optHeader *ObjectDeleteOptions
- // When use "" string might call the delete bucket interface
- if len(name) == 0 {
- return nil, errors.New("empty object name")
- }
- if len(opt) > 0 {
- optHeader = opt[0]
- }
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: "/" + encodeURIComponent(name),
- method: http.MethodDelete,
- optHeader: optHeader,
- optQuery: optHeader,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- return resp, err
- }
- // ObjectHeadOptions is the option of HeadObject
- type ObjectHeadOptions struct {
- IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
- // SSE-C
- XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
- XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
- XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
- }
- // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
- //
- // https://www.qcloud.com/document/product/436/7745
- func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
- var u string
- if len(id) == 1 {
- u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
- } else if len(id) == 0 {
- u = "/" + encodeURIComponent(name)
- } else {
- return nil, errors.New("wrong params")
- }
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodHead,
- optHeader: opt,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
- resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
- }
- return resp, err
- }
- // ObjectOptionsOptions is the option of object options
- type ObjectOptionsOptions struct {
- Origin string `url:"-" header:"Origin"`
- AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
- AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
- }
- // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
- //
- // 当CORS配置不存在时,请求返回403 Forbidden。
- //
- // https://www.qcloud.com/document/product/436/8288
- func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: "/" + encodeURIComponent(name),
- method: http.MethodOptions,
- optHeader: opt,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- return resp, err
- }
- // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
- type CASJobParameters struct {
- Tier string `xml:"Tier"`
- }
- // ObjectRestoreOptions is the option of object restore
- type ObjectRestoreOptions struct {
- XMLName xml.Name `xml:"RestoreRequest"`
- Days int `xml:"Days"`
- Tier *CASJobParameters `xml:"CASJobParameters"`
- }
- // PutRestore API can recover an object of type archived by COS archive.
- //
- // https://cloud.tencent.com/document/product/436/12633
- func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
- u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodPost,
- body: opt,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- return resp, err
- }
- // TODO Append 接口在优化未开放使用
- //
- // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
- // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
- //
- // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
- //
- // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
- // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
- //
- // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
- //
- // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
- //
- // https://www.qcloud.com/document/product/436/7741
- // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
- // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
- // if position != 0{
- // opt = nil
- // }
- // sendOpt := sendOptions{
- // baseURL: s.client.BaseURL.BucketURL,
- // uri: u,
- // method: http.MethodPost,
- // optHeader: opt,
- // body: r,
- // }
- // resp, err := s.client.send(ctx, &sendOpt)
- // return resp, err
- // }
- // ObjectDeleteMultiOptions is the option of DeleteMulti
- type ObjectDeleteMultiOptions struct {
- XMLName xml.Name `xml:"Delete" header:"-"`
- Quiet bool `xml:"Quiet" header:"-"`
- Objects []Object `xml:"Object" header:"-"`
- //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
- }
- // ObjectDeleteMultiResult is the result of DeleteMulti
- type ObjectDeleteMultiResult struct {
- XMLName xml.Name `xml:"DeleteResult"`
- DeletedObjects []Object `xml:"Deleted,omitempty"`
- Errors []struct {
- Key string `xml:",omitempty"`
- Code string `xml:",omitempty"`
- Message string `xml:",omitempty"`
- VersionId string `xml:",omitempty"`
- } `xml:"Error,omitempty"`
- }
- // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
- // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
- // Quiet模式只返回报错的Object信息。
- // https://www.qcloud.com/document/product/436/8289
- func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
- var res ObjectDeleteMultiResult
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: "/?delete",
- method: http.MethodPost,
- body: opt,
- result: &res,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- return &res, resp, err
- }
- // Object is the meta info of the object
- type Object struct {
- Key string `xml:",omitempty"`
- ETag string `xml:",omitempty"`
- Size int64 `xml:",omitempty"`
- PartNumber int `xml:",omitempty"`
- LastModified string `xml:",omitempty"`
- StorageClass string `xml:",omitempty"`
- Owner *Owner `xml:",omitempty"`
- VersionId string `xml:",omitempty"`
- }
- // MultiUploadOptions is the option of the multiupload,
- // ThreadPoolSize default is one
- type MultiUploadOptions struct {
- OptIni *InitiateMultipartUploadOptions
- PartSize int64
- ThreadPoolSize int
- CheckPoint bool
- EnableVerification bool
- }
- type Chunk struct {
- Number int
- OffSet int64
- Size int64
- Done bool
- ETag string
- }
- // jobs
- type Jobs struct {
- Name string
- UploadId string
- FilePath string
- RetryTimes int
- Chunk Chunk
- Data io.Reader
- Opt *ObjectUploadPartOptions
- }
- type Results struct {
- PartNumber int
- Resp *Response
- err error
- }
- func LimitReadCloser(r io.Reader, n int64) io.Reader {
- var lc LimitedReadCloser
- lc.R = r
- lc.N = n
- return &lc
- }
- type LimitedReadCloser struct {
- io.LimitedReader
- }
- func (lc *LimitedReadCloser) Close() error {
- if r, ok := lc.R.(io.ReadCloser); ok {
- return r.Close()
- }
- return nil
- }
- func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
- for j := range jobs {
- j.Opt.ContentLength = j.Chunk.Size
- rt := j.RetryTimes
- for {
- // http.Request.Body can be Closed in request
- fd, err := os.Open(j.FilePath)
- var res Results
- if err != nil {
- res.err = err
- res.PartNumber = j.Chunk.Number
- res.Resp = nil
- results <- &res
- break
- }
- fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
- resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
- LimitReadCloser(fd, j.Chunk.Size), j.Opt)
- res.PartNumber = j.Chunk.Number
- res.Resp = resp
- res.err = err
- if err != nil {
- rt--
- if rt == 0 {
- results <- &res
- break
- }
- continue
- }
- results <- &res
- break
- }
- }
- }
- func DividePart(fileSize int64, last int) (int64, int64) {
- partSize := int64(last * 1024 * 1024)
- partNum := fileSize / partSize
- for partNum >= 10000 {
- partSize = partSize * 2
- partNum = fileSize / partSize
- }
- return partNum, partSize
- }
- func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
- if filePath == "" {
- return 0, nil, 0, errors.New("filePath invalid")
- }
- file, err := os.Open(filePath)
- if err != nil {
- return 0, nil, 0, err
- }
- defer file.Close()
- stat, err := file.Stat()
- if err != nil {
- return 0, nil, 0, err
- }
- var partNum int64
- if partSize > 0 {
- partSize = partSize * 1024 * 1024
- partNum = stat.Size() / partSize
- if partNum >= 10000 {
- return 0, nil, 0, errors.New("Too many parts, out of 10000")
- }
- } else {
- partNum, partSize = DividePart(stat.Size(), 64)
- }
- var chunks []Chunk
- var chunk = Chunk{}
- for i := int64(0); i < partNum; i++ {
- chunk.Number = int(i + 1)
- chunk.OffSet = i * partSize
- chunk.Size = partSize
- chunks = append(chunks, chunk)
- }
- if stat.Size()%partSize > 0 {
- chunk.Number = len(chunks) + 1
- chunk.OffSet = int64(len(chunks)) * partSize
- chunk.Size = stat.Size() % partSize
- chunks = append(chunks, chunk)
- partNum++
- }
- return int64(stat.Size()), chunks, int(partNum), nil
- }
- func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
- opt := &ObjectListUploadsOptions{
- Prefix: name,
- EncodingType: "url",
- }
- res, _, err := s.ListUploads(ctx, opt)
- if err != nil {
- return "", err
- }
- if len(res.Upload) == 0 {
- return "", nil
- }
- last := len(res.Upload) - 1
- for last >= 0 {
- decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
- if decodeKey == name {
- return decodeURIComponent(res.Upload[last].UploadID)
- }
- last = last - 1
- }
- return "", nil
- }
- func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
- var uploadedParts []Object
- isTruncated := true
- opt := &ObjectListPartsOptions{
- EncodingType: "url",
- }
- for isTruncated {
- res, _, err := s.ListParts(ctx, name, UploadID, opt)
- if err != nil {
- return err
- }
- if len(res.Parts) > 0 {
- uploadedParts = append(uploadedParts, res.Parts...)
- }
- isTruncated = res.IsTruncated
- opt.PartNumberMarker = res.NextPartNumberMarker
- }
- fd, err := os.Open(filepath)
- if err != nil {
- return err
- }
- defer fd.Close()
- // 某个分块出错, 重置chunks
- ret := func(e error) error {
- for i, _ := range chunks {
- chunks[i].Done = false
- chunks[i].ETag = ""
- }
- return e
- }
- for _, part := range uploadedParts {
- partNumber := part.PartNumber
- if partNumber > partNum {
- return ret(errors.New("Part Number is not consistent"))
- }
- partNumber = partNumber - 1
- fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
- bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
- if err != nil {
- return ret(err)
- }
- localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
- if localMD5 != part.ETag {
- return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
- }
- chunks[partNumber].Done = true
- chunks[partNumber].ETag = part.ETag
- }
- return nil
- }
- // MultiUpload/Upload 为高级upload接口,并发分块上传
- // 注意该接口目前只供参考
- //
- // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
- // 由调用者指定分块大小时,请确认分块数量不超过10000
- //
- func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
- return s.Upload(ctx, name, filepath, opt)
- }
- func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
- if opt == nil {
- opt = &MultiUploadOptions{}
- }
- var localcrc uint64
- // 1.Get the file chunk
- totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
- if err != nil {
- return nil, nil, err
- }
- // 校验
- if s.client.Conf.EnableCRC {
- fd, err := os.Open(filepath)
- if err != nil {
- return nil, nil, err
- }
- defer fd.Close()
- localcrc, err = calCRC64(fd)
- if err != nil {
- return nil, nil, err
- }
- }
- // filesize=0 , use simple upload
- if partNum == 0 || partNum == 1 {
- var opt0 *ObjectPutOptions
- if opt.OptIni != nil {
- opt0 = &ObjectPutOptions{
- opt.OptIni.ACLHeaderOptions,
- opt.OptIni.ObjectPutHeaderOptions,
- }
- }
- rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
- if err != nil {
- return nil, rsp, err
- }
- result := &CompleteMultipartUploadResult{
- Key: name,
- ETag: rsp.Header.Get("ETag"),
- }
- if rsp != nil && s.client.Conf.EnableCRC {
- scoscrc := rsp.Header.Get("x-cos-hash-crc64ecma")
- icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
- if icoscrc != localcrc {
- return result, rsp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
- }
- }
- return result, rsp, nil
- }
- var uploadID string
- resumableFlag := false
- if opt.CheckPoint {
- var err error
- uploadID, err = s.getResumableUploadID(ctx, name)
- if err == nil && uploadID != "" {
- err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
- resumableFlag = (err == nil)
- }
- }
- // 2.Init
- optini := opt.OptIni
- if !resumableFlag {
- res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
- if err != nil {
- return nil, nil, err
- }
- uploadID = res.UploadID
- }
- var poolSize int
- if opt.ThreadPoolSize > 0 {
- poolSize = opt.ThreadPoolSize
- } else {
- // Default is one
- poolSize = 1
- }
- chjobs := make(chan *Jobs, 100)
- chresults := make(chan *Results, 10000)
- optcom := &CompleteMultipartUploadOptions{}
- // 3.Start worker
- for w := 1; w <= poolSize; w++ {
- go worker(s, chjobs, chresults)
- }
- // progress started event
- var listener ProgressListener
- var consumedBytes int64
- if opt.OptIni != nil {
- listener = opt.OptIni.Listener
- }
- event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
- progressCallback(listener, event)
- // 4.Push jobs
- go func() {
- for _, chunk := range chunks {
- if chunk.Done {
- continue
- }
- partOpt := &ObjectUploadPartOptions{}
- if optini != nil && optini.ObjectPutHeaderOptions != nil {
- partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
- partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
- partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
- partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
- }
- job := &Jobs{
- Name: name,
- RetryTimes: 3,
- FilePath: filepath,
- UploadId: uploadID,
- Chunk: chunk,
- Opt: partOpt,
- }
- chjobs <- job
- }
- close(chjobs)
- }()
- // 5.Recv the resp etag to complete
- err = nil
- for i := 0; i < partNum; i++ {
- if chunks[i].Done {
- optcom.Parts = append(optcom.Parts, Object{
- PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
- )
- if err == nil {
- consumedBytes += chunks[i].Size
- event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
- progressCallback(listener, event)
- }
- continue
- }
- res := <-chresults
- // Notice one part fail can not get the etag according.
- if res.Resp == nil || res.err != nil {
- // Some part already fail, can not to get the header inside.
- err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
- continue
- }
- // Notice one part fail can not get the etag according.
- etag := res.Resp.Header.Get("ETag")
- optcom.Parts = append(optcom.Parts, Object{
- PartNumber: res.PartNumber, ETag: etag},
- )
- if err == nil {
- consumedBytes += chunks[res.PartNumber-1].Size
- event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
- progressCallback(listener, event)
- }
- }
- close(chresults)
- if err != nil {
- event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
- progressCallback(listener, event)
- return nil, nil, err
- }
- sort.Sort(ObjectList(optcom.Parts))
- event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
- progressCallback(listener, event)
- v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
- if err != nil {
- s.AbortMultipartUpload(ctx, name, uploadID)
- return v, resp, err
- }
- if resp != nil && s.client.Conf.EnableCRC {
- scoscrc := resp.Header.Get("x-cos-hash-crc64ecma")
- icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
- if icoscrc != localcrc {
- return v, resp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
- }
- }
- return v, resp, err
- }
- type ObjectPutTaggingOptions struct {
- XMLName xml.Name `xml:"Tagging"`
- TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
- }
- type ObjectTaggingTag BucketTaggingTag
- type ObjectGetTaggingResult ObjectPutTaggingOptions
- func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
- var u string
- if len(id) == 1 {
- u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
- } else if len(id) == 0 {
- u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
- } else {
- return nil, errors.New("wrong params")
- }
- sendOpt := &sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodPut,
- body: opt,
- }
- resp, err := s.client.send(ctx, sendOpt)
- return resp, err
- }
- func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
- var u string
- if len(id) == 1 {
- u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
- } else if len(id) == 0 {
- u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
- } else {
- return nil, nil, errors.New("wrong params")
- }
- var res ObjectGetTaggingResult
- sendOpt := &sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodGet,
- result: &res,
- }
- resp, err := s.client.send(ctx, sendOpt)
- return &res, resp, err
- }
- func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
- var u string
- if len(id) == 1 {
- u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
- } else if len(id) == 0 {
- u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
- } else {
- return nil, errors.New("wrong params")
- }
- sendOpt := &sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodDelete,
- }
- resp, err := s.client.send(ctx, sendOpt)
- return resp, err
- }
|