| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package handlers
- import (
- "context"
- "net/http"
- "net/url"
- "strings"
- "time"
- "github.com/minio/minio-go/pkg/s3utils"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/jsonutils"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/regutils"
- "yunion.io/x/pkg/util/timeutils"
- "yunion.io/x/s3cli"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/s3gateway/models"
- "yunion.io/x/onecloud/pkg/s3gateway/options"
- )
- func InitHandlers(app *appsrv.Application) {
- h := app.AddHandler2("HEAD", "", s3authenticate(headHandler), nil, "head", nil)
- h.SetProcessTimeoutCallback(s3HandlerTimeoutInfo)
- h = app.AddHandler2("GET", "", s3authenticate(readHandler), nil, "get", nil)
- h.SetProcessTimeoutCallback(s3HandlerTimeoutInfo)
- h = app.AddHandler2("PUT", "", s3authenticate(putHandler), nil, "put", nil)
- h.SetProcessTimeoutCallback(s3HandlerTimeoutInfo)
- h = app.AddHandler2("POST", "", s3authenticate(postHandler), nil, "post", nil)
- h.SetProcessTimeoutCallback(s3HandlerTimeoutInfo)
- h = app.AddHandler2("DELETE", "", s3authenticate(deleteHandler), nil, "delete", nil)
- h.SetProcessTimeoutCallback(s3HandlerTimeoutInfo)
- }
- func s3HandlerTimeoutInfo(info *appsrv.SHandlerInfo, r *http.Request) time.Duration {
- o, _ := getObjectRequest(r)
- if len(o.Bucket) > 0 && len(o.Key) > 0 {
- if r.Method == http.MethodGet && len(r.URL.RawQuery) == 0 {
- return 2 * time.Hour
- } else if r.Method == http.MethodPut && (len(r.URL.RawQuery) == 0 || strings.Contains(r.URL.RawQuery, "partNumber=")) {
- return 2 * time.Hour
- }
- }
- return time.Duration(0)
- }
- type SObjectRequest struct {
- VirtualHost bool
- Bucket string
- Key string
- }
- func (o SObjectRequest) Validate() error {
- if len(o.Bucket) == 0 {
- return nil
- }
- err := s3utils.CheckValidBucketNameStrict(o.Bucket)
- if err != nil {
- return err
- }
- if len(o.Key) == 0 {
- return nil
- }
- err = s3utils.CheckValidObjectName(o.Key)
- if err != nil {
- return err
- }
- return nil
- }
- func getObjectRequest(r *http.Request) (SObjectRequest, error) {
- o := SObjectRequest{}
- if regutils.MatchIP4Addr(r.Host) || r.Host == options.Options.DomainName {
- o.VirtualHost = false
- segs := appsrv.SplitPath(r.URL.Path)
- if len(segs) > 0 {
- o.Bucket = segs[0]
- if len(segs) > 1 {
- o.Key = strings.Join(segs[1:], "/")
- if strings.HasSuffix(r.URL.Path, "/") {
- o.Key += "/"
- }
- }
- }
- } else if strings.HasSuffix(r.Host, "."+options.Options.DomainName) {
- o.VirtualHost = true
- o.Bucket = r.Host[:len(r.Host)-len(options.Options.DomainName)-1]
- segs := appsrv.SplitPath(r.URL.Path)
- o.Key = strings.Join(segs, "/")
- if strings.HasSuffix(r.URL.Path, "/") {
- o.Key += "/"
- }
- } else {
- return o, errors.Error("invalid S3 request")
- }
- var err error
- o.Key, err = url.PathUnescape(o.Key)
- if err != nil {
- return o, errors.Wrap(err, "url.PathUnescape")
- }
- return o, o.Validate()
- }
- func headHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- o := fetchObjectRequest(ctx)
- userCred := auth.FetchUserCredential(ctx, nil)
- if len(o.Bucket) > 0 && len(o.Key) == 0 {
- // head bucket
- err := headBucket(ctx, userCred, o.Bucket)
- if err != nil {
- SendGeneralError(ctx, w, err)
- } else {
- appsrv.SendHeader(w, nil)
- }
- return
- } else if len(o.Bucket) > 0 && len(o.Key) > 0 {
- // head object
- hdr, err := headObject(ctx, userCred, o.Bucket, o.Key)
- if err != nil {
- SendGeneralError(ctx, w, err)
- } else {
- appsrv.SendHeader(w, hdr)
- }
- return
- } else {
- // do nothing
- }
- SendError(ctx, w, NotSupported(ctx, "method not supported"))
- }
- func readBucket(ctx context.Context, userCred mcclient.TokenCredential, bucketName string, query jsonutils.JSONObject, r *http.Request) (interface{}, http.Header, error) {
- bucket, err := models.BucketManager.GetByName(ctx, userCred, bucketName)
- if err != nil {
- return nil, nil, errors.Wrap(err, "models.BucketManager.GetByName")
- }
- if query.Contains("accelerate") {
- } else if query.Contains("acl") {
- resp, err := bucketAcl(ctx, userCred, bucketName)
- return resp, nil, err
- } else if query.Contains("analytics") {
- } else if query.Contains("cors") {
- } else if query.Contains("encryption") {
- } else if query.Contains("inventory") {
- } else if query.Contains("lifecycle") {
- } else if query.Contains("location") {
- result := s3cli.LocationConstraint(bucket.Location)
- return &result, nil, nil
- } else if query.Contains("publicAccessBlock") {
- } else if query.Contains("logging") {
- } else if query.Contains("metrics") {
- } else if query.Contains("notification") {
- } else if query.Contains("object-lock") {
- } else if query.Contains("policyStatus") {
- } else if query.Contains("versions") {
- } else if query.Contains("policy") {
- } else if query.Contains("replication") {
- } else if query.Contains("requestPayment") {
- } else if query.Contains("tagging") {
- } else if query.Contains("versioning") {
- return &s3cli.VersioningConfiguration{}, nil, nil
- } else if query.Contains("website") {
- } else if query.Contains("uploads") {
- input := s3cli.ListMultipartUploadsInput{}
- err := query.Unmarshal(&input)
- if err != nil {
- return nil, nil, errors.Wrap(err, "query.Unmarshal ListMultipartUploadsInput")
- }
- result, err := listBucketUploads(ctx, userCred, bucketName, &input)
- if err != nil {
- return nil, nil, errors.Wrap(err, "listBucketUploads")
- }
- return result, nil, nil
- } else {
- // list objects in bucket
- input := s3cli.ListObjectInput{}
- err := query.Unmarshal(&input)
- if err != nil {
- return nil, nil, errors.Wrap(err, "query.Unmarshal")
- }
- result, err := bucket.ListObject(ctx, userCred, &input)
- if err != nil {
- return nil, nil, errors.Wrap(err, "bucket.ListObject")
- }
- return result, nil, nil
- }
- return nil, nil, NotImplemented(ctx, "not implemented")
- }
- func readObject(ctx context.Context, userCred mcclient.TokenCredential, bucketName string, objKey string, query jsonutils.JSONObject, r *http.Request) (interface{}, http.Header, error) {
- if query.Contains("acl") {
- resp, err := objectAcl(ctx, userCred, bucketName, objKey)
- return resp, nil, err
- } else if query.Contains("legal-hold") {
- } else if query.Contains("retention") {
- } else if query.Contains("tagging") {
- } else if query.Contains("torrent") {
- } else {
- // download object itself, which has been handled
- }
- return nil, nil, NotImplemented(ctx, "not implemented")
- }
- func getRangeOpt(rangeStr string, sizeBytes int64) (*cloudprovider.SGetObjectRange, error) {
- if len(rangeStr) > 0 {
- rangeOptObj := cloudprovider.ParseRange(rangeStr)
- if rangeOptObj.End == 0 {
- rangeOptObj.End = sizeBytes - 1
- }
- if rangeOptObj.Start >= sizeBytes || rangeOptObj.End >= sizeBytes {
- return nil, httperrors.ErrOutOfRange
- }
- if rangeOptObj.Start > 0 || rangeOptObj.End < sizeBytes-1 {
- return &rangeOptObj, nil
- }
- }
- return nil, nil
- }
- func downloadObject(ctx context.Context, userCred mcclient.TokenCredential, bucketName string, key string, reqHdr http.Header, w http.ResponseWriter) error {
- bucket, err := models.BucketManager.GetByName(ctx, userCred, bucketName)
- if err != nil {
- return errors.Wrap(err, "models.BucketManager.GetByName")
- }
- iBucket, err := bucket.GetIBucket(ctx, userCred)
- if err != nil {
- return errors.Wrap(err, "bucket.GetIBucket")
- }
- obj, err := cloudprovider.GetIObject(iBucket, key)
- if err != nil {
- return errors.Wrap(err, "cloudprovider.GetIObject")
- }
- hdr := cloudprovider.MetaToHttpHeader(cloudprovider.META_HEADER_PREFIX, obj.GetMeta())
- eTag := obj.GetETag()
- if len(eTag) > 0 {
- hdr.Set("ETag", eTag)
- }
- lastModified := obj.GetLastModified()
- if !lastModified.IsZero() {
- hdr.Set("Last-Modified", lastModified.Format(timeutils.RFC2882Format))
- }
- rangeStr := reqHdr.Get(http.CanonicalHeaderKey("range"))
- rangeOpt, err := getRangeOpt(rangeStr, obj.GetSizeBytes())
- if err != nil {
- return errors.Wrap(err, rangeStr)
- }
- stream, err := iBucket.GetObject(ctx, key, rangeOpt)
- if err != nil {
- return errors.Wrap(err, "iBucket.GetObject")
- }
- err = appsrv.SendStream(w, rangeOpt != nil, hdr, stream, obj.GetSizeBytes())
- if err != nil {
- return errors.Wrap(err, "appsrv.SendStream")
- }
- return nil
- }
- func readHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- o := fetchObjectRequest(ctx)
- userCred := auth.FetchUserCredential(ctx, nil)
- if len(o.Bucket) == 0 {
- // service
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- input := s3cli.ListBucketsInput{}
- err = query.Unmarshal(&input)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- } else {
- resp, err := listService(ctx, userCred, input)
- if err != nil {
- SendGeneralError(ctx, w, err)
- } else {
- appsrv.SendXml(w, nil, resp)
- }
- }
- } else if len(o.Bucket) > 0 && len(o.Key) == 0 {
- // bucket get
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, respHdr, err := readBucket(ctx, userCred, o.Bucket, query, r)
- if err != nil {
- SendGeneralError(ctx, w, err)
- return
- }
- appsrv.SendXml(w, respHdr, resp)
- } else {
- // object get
- if len(r.URL.RawQuery) == 0 {
- // download object
- err := downloadObject(ctx, userCred, o.Bucket, o.Key, r.Header, w)
- if err != nil {
- SendGeneralError(ctx, w, err)
- }
- return
- }
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, respHdr, err := readObject(ctx, userCred, o.Bucket, o.Key, query, r)
- if err != nil {
- SendGeneralError(ctx, w, err)
- return
- }
- if resp != nil {
- appsrv.SendXml(w, respHdr, resp)
- }
- }
- }
- func postObject(ctx context.Context, userCred mcclient.TokenCredential, bucket string, key string, query jsonutils.JSONObject, r *http.Request) (interface{}, http.Header, error) {
- if query.Contains("uploads") {
- // initialize multipart upload
- return initMultipartUpload(ctx, userCred, r.Header, bucket, key)
- } else if query.Contains("uploadId") {
- // complete multipart upload
- uploadId, err := query.GetString("uploadId")
- if err != nil || len(uploadId) == 0 {
- return nil, nil, errors.Wrap(httperrors.ErrBadRequest, "uploadId")
- }
- request := s3cli.CompleteMultipartUpload{}
- err = appsrv.FetchXml(r, &request)
- if err != nil {
- return nil, nil, errors.Wrap(httperrors.ErrBadRequest, "FetchXml")
- }
- return completeMultipartUpload(ctx, userCred, r.Header, bucket, key, uploadId, &request)
- } else if query.Contains("select") {
- // select object
- return selectObject(ctx, userCred, r.Header, bucket, key)
- } else {
- // upload object by form POST
- }
- return nil, nil, NotImplemented(ctx, "not implemented")
- }
- func postHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- o := fetchObjectRequest(ctx)
- userCred := auth.FetchUserCredential(ctx, nil)
- if len(o.Bucket) == 0 {
- // no bucket
- // do nothing
- } else if len(o.Bucket) > 0 && len(o.Key) == 0 {
- // bucket post
- // do nothing
- } else {
- // object post
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, respHdr, err := postObject(ctx, userCred, o.Bucket, o.Key, query, r)
- if err != nil {
- SendGeneralError(ctx, w, err)
- return
- }
- appsrv.SendXml(w, respHdr, resp)
- return
- }
- SendError(ctx, w, NotSupported(ctx, "method not supported"))
- }
- func putBucket(ctx context.Context, userCred mcclient.TokenCredential, bucket string, query jsonutils.JSONObject, r *http.Request) (interface{}, http.Header, error) {
- if query.Contains("accelerate") {
- } else if query.Contains("acl") {
- } else if query.Contains("analytics") {
- } else if query.Contains("cors") {
- } else if query.Contains("encryption") {
- } else if query.Contains("inventory") {
- } else if query.Contains("lifecycle") {
- } else if query.Contains("publicAccessBlock") {
- } else if query.Contains("logging") {
- } else if query.Contains("metrics") {
- } else if query.Contains("notification") {
- } else if query.Contains("object-lock") {
- } else if query.Contains("policy") {
- } else if query.Contains("replication") {
- } else if query.Contains("requestPayment") {
- } else if query.Contains("tagging") {
- } else if query.Contains("versioning") {
- } else if query.Contains("website") {
- } else {
- // create bucket
- return nil, nil, NotSupported(ctx, "Not supported")
- }
- return nil, nil, NotImplemented(ctx, "not implemented")
- }
- func putObject(ctx context.Context, userCred mcclient.TokenCredential, bucketName string, key string, query jsonutils.JSONObject, r *http.Request) (interface{}, http.Header, error) {
- if query.Contains("legal-hold") {
- } else if query.Contains("retention") {
- } else if query.Contains("acl") {
- } else if query.Contains("tagging") {
- } else {
- // upload object
- uploadId, _ := query.GetString("uploadId")
- partNumber, _ := query.Int("partNumber")
- copySource := r.Header.Get(http.CanonicalHeaderKey("x-amz-copy-source"))
- if len(copySource) > 0 {
- return copyObject(ctx, userCred, bucketName, key, copySource, r.Header, uploadId, int(partNumber))
- } else {
- hdr, err := uploadObject(ctx, userCred, bucketName, key, r.Header, r.Body, uploadId, int(partNumber))
- defer r.Body.Close()
- if err != nil {
- return nil, nil, err
- }
- return nil, hdr, nil
- }
- }
- return nil, nil, NotImplemented(ctx, "not implemented")
- }
- func putHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- o := fetchObjectRequest(ctx)
- userCred := auth.FetchUserCredential(ctx, nil)
- if len(o.Bucket) == 0 {
- // no bucket
- } else if len(o.Bucket) > 0 && len(o.Key) == 0 {
- // bucket put
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, respHdr, err := putBucket(ctx, userCred, o.Bucket, query, r)
- if err != nil {
- SendGeneralError(ctx, w, err)
- return
- }
- appsrv.SendXml(w, respHdr, resp)
- return
- } else {
- // object put
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, respHdr, err := putObject(ctx, userCred, o.Bucket, o.Key, query, r)
- if err != nil {
- SendGeneralError(ctx, w, err)
- return
- }
- appsrv.SendXml(w, respHdr, resp)
- return
- }
- SendError(ctx, w, NotSupported(ctx, "method not supported"))
- }
- func deleteBucket(ctx context.Context, userCred mcclient.TokenCredential, bucket string, query jsonutils.JSONObject) (interface{}, error) {
- if query.Contains("analytics") {
- } else if query.Contains("cors") {
- } else if query.Contains("encryption") {
- } else if query.Contains("inventory") {
- } else if query.Contains("lifecycle") {
- } else if query.Contains("publicAccessBlock") {
- } else if query.Contains("metrics") {
- } else if query.Contains("policy") {
- } else if query.Contains("replication") {
- } else if query.Contains("tagging") {
- } else if query.Contains("website") {
- } else {
- // delete bucket
- err := removeBucket(ctx, userCred, bucket)
- if err != nil {
- return nil, err
- }
- return nil, nil
- }
- return nil, NotImplemented(ctx, "not implemented")
- }
- func deleteObject(ctx context.Context, userCred mcclient.TokenCredential, bucket string, key string, query jsonutils.JSONObject) (interface{}, error) {
- if query.Contains("tagging") {
- return deleteObjectTags(ctx, userCred, bucket, key)
- } else {
- // delete object
- err := removeObject(ctx, userCred, bucket, key)
- if err != nil {
- return nil, err
- }
- return nil, nil
- }
- }
- func deleteHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- o := fetchObjectRequest(ctx)
- userCred := auth.FetchUserCredential(ctx, nil)
- if len(o.Bucket) == 0 {
- // no bucket
- } else if len(o.Bucket) > 0 && len(o.Key) == 0 {
- // bucket delete
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, err := deleteBucket(ctx, userCred, o.Bucket, query)
- if err != nil {
- SendGeneralError(ctx, w, err)
- } else {
- appsrv.SendXml(w, nil, resp)
- }
- return
- } else {
- // object delete
- query, err := jsonutils.ParseQueryString(r.URL.RawQuery)
- if err != nil {
- SendError(ctx, w, BadRequest(ctx, err.Error()))
- return
- }
- resp, err := deleteObject(ctx, userCred, o.Bucket, o.Key, query)
- if err != nil {
- SendGeneralError(ctx, w, err)
- } else {
- appsrv.SendXml(w, nil, resp)
- }
- return
- }
- SendError(ctx, w, NotSupported(ctx, "method not supported"))
- }
|