From 4bb885d5260aa67fbe10f18140c242efec1282fe Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 19 Aug 2021 09:55:22 +0300 Subject: [PATCH] [#122] Refactor Signed-off-by: Denis Kirillov --- api/cache/{bucket.go => buckets.go} | 21 +- api/cache/{head_cache.go => names.go} | 22 +- api/cache/object_cache_test.go | 1 - api/cache/system.go | 19 +- api/errors/errors.go | 28 +- api/handler/acl.go | 11 +- api/handler/copy.go | 40 ++- api/handler/delete.go | 32 ++- api/handler/get.go | 26 +- api/handler/head.go | 22 +- api/handler/object_list.go | 10 + api/handler/put.go | 6 + api/handler/response.go | 2 +- api/handler/versioning.go | 31 ++- api/headers.go | 5 + api/layer/container.go | 33 ++- api/layer/layer.go | 358 ++------------------------ api/layer/object.go | 18 +- api/layer/object_list_cache.go | 2 + api/layer/versioning.go | 325 +++++++++++++++++++++++ docs/aws_s3_compat.md | 4 +- 21 files changed, 550 insertions(+), 466 deletions(-) rename api/cache/{bucket.go => buckets.go} (74%) rename api/cache/{head_cache.go => names.go} (51%) create mode 100644 api/layer/versioning.go diff --git a/api/cache/bucket.go b/api/cache/buckets.go similarity index 74% rename from api/cache/bucket.go rename to api/cache/buckets.go index c6896bdb..667e3653 100644 --- a/api/cache/bucket.go +++ b/api/cache/buckets.go @@ -18,10 +18,11 @@ type ( // BucketInfo stores basic bucket data. BucketInfo struct { - Name string - CID *cid.ID - Owner *owner.ID - Created time.Time + Name string + CID *cid.ID + Owner *owner.ID + Created time.Time + BasicACL uint32 } // GetBucketCache contains cache with objects and lifetime of cache entries. @@ -62,3 +63,15 @@ func (o *GetBucketCache) Put(bkt *BucketInfo) error { func (o *GetBucketCache) Delete(key string) bool { return o.cache.Remove(key) } + +const bktVersionSettingsObject = ".s3-versioning-settings" + +// SettingsObjectName is system name for bucket settings file. +func (b *BucketInfo) SettingsObjectName() string { + return bktVersionSettingsObject +} + +// SettingsObjectKey is key to use in SystemCache. +func (b *BucketInfo) SettingsObjectKey() string { + return b.Name + bktVersionSettingsObject +} diff --git a/api/cache/head_cache.go b/api/cache/names.go similarity index 51% rename from api/cache/head_cache.go rename to api/cache/names.go index 08ac3d13..afb4b577 100644 --- a/api/cache/head_cache.go +++ b/api/cache/names.go @@ -7,30 +7,32 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" ) -// HeadObjectsCache provides interface for lru cache for objects. -type HeadObjectsCache interface { +// ObjectsNameCache provides interface for lru cache for objects. +// This cache contains mapping nice name to object addresses. +// Key is bucketName+objectName. +type ObjectsNameCache interface { Get(key string) *object.Address Put(key string, address *object.Address) error Delete(key string) bool } type ( - // HeadObjectCache contains cache with objects and lifetime of cache entries. - HeadObjectCache struct { + // NameCache contains cache with objects and lifetime of cache entries. + NameCache struct { cache gcache.Cache lifetime time.Duration } ) -// NewHeadObject creates an object of ObjectHeadersCache. -func NewHeadObject(cacheSize int, lifetime time.Duration) *HeadObjectCache { +// NewObjectsNameCache creates an object of ObjectsNameCache. +func NewObjectsNameCache(cacheSize int, lifetime time.Duration) *NameCache { gc := gcache.New(cacheSize).LRU().Build() - return &HeadObjectCache{cache: gc, lifetime: lifetime} + return &NameCache{cache: gc, lifetime: lifetime} } // Get returns cached object. -func (o *HeadObjectCache) Get(key string) *object.Address { +func (o *NameCache) Get(key string) *object.Address { entry, err := o.cache.Get(key) if err != nil { return nil @@ -45,11 +47,11 @@ func (o *HeadObjectCache) Get(key string) *object.Address { } // Put puts an object to cache. -func (o *HeadObjectCache) Put(key string, address *object.Address) error { +func (o *NameCache) Put(key string, address *object.Address) error { return o.cache.SetWithExpire(key, address, o.lifetime) } // Delete deletes an object from cache. -func (o *HeadObjectCache) Delete(key string) bool { +func (o *NameCache) Delete(key string) bool { return o.cache.Remove(key) } diff --git a/api/cache/object_cache_test.go b/api/cache/object_cache_test.go index e307004a..40dfd87e 100644 --- a/api/cache/object_cache_test.go +++ b/api/cache/object_cache_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/nspcc-dev/neofs-api-go/pkg/object" - objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test" "github.com/stretchr/testify/require" ) diff --git a/api/cache/system.go b/api/cache/system.go index d8f18b00..7ceec010 100644 --- a/api/cache/system.go +++ b/api/cache/system.go @@ -3,35 +3,36 @@ package cache import ( "time" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/bluele/gcache" + "github.com/nspcc-dev/neofs-api-go/pkg/object" ) type ( // SystemCache provides interface for lru cache for objects. + // This cache contains "system" objects (bucket versioning settings, tagging object etc.). + // Key is bucketName+systemFileName. SystemCache interface { Get(key string) *object.Object Put(key string, obj *object.Object) error Delete(key string) bool } - // systemCache contains cache with objects and lifetime of cache entries. - systemCache struct { + // SysCache contains cache with objects and lifetime of cache entries. + SysCache struct { cache gcache.Cache lifetime time.Duration } ) // NewSystemCache creates an object of SystemCache. -func NewSystemCache(cacheSize int, lifetime time.Duration) SystemCache { +func NewSystemCache(cacheSize int, lifetime time.Duration) *SysCache { gc := gcache.New(cacheSize).LRU().Build() - return &systemCache{cache: gc, lifetime: lifetime} + return &SysCache{cache: gc, lifetime: lifetime} } // Get returns cached object. -func (o *systemCache) Get(key string) *object.Object { +func (o *SysCache) Get(key string) *object.Object { entry, err := o.cache.Get(key) if err != nil { return nil @@ -46,11 +47,11 @@ func (o *systemCache) Get(key string) *object.Object { } // Put puts an object to cache. -func (o *systemCache) Put(key string, obj *object.Object) error { +func (o *SysCache) Put(key string, obj *object.Object) error { return o.cache.SetWithExpire(key, obj, o.lifetime) } // Delete deletes an object from cache. -func (o *systemCache) Delete(key string) bool { +func (o *SysCache) Delete(key string) bool { return o.cache.Remove(key) } diff --git a/api/errors/errors.go b/api/errors/errors.go index d1dd1126..f9c8aadb 100644 --- a/api/errors/errors.go +++ b/api/errors/errors.go @@ -1924,26 +1924,18 @@ func GetAPIError(code ErrorCode) Error { return errorCodes.toAPIErr(ErrInternalError) } -// GenericError - generic object layer error. -type GenericError struct { - Bucket string - Object string +// ObjectError - error that linked to specific object. +type ObjectError struct { + Err error + Object string + Version string } -// ObjectAlreadyExists object already exists. -// This type should be removed when s3-gw will support versioning. -type ObjectAlreadyExists GenericError - -func (e ObjectAlreadyExists) Error() string { - return "Object: " + e.Bucket + "#" + e.Object + " already exists" +func (e ObjectError) Error() string { + return fmt.Sprintf("%s (%s:%s)", e.Err, e.Object, e.Version) } -// DeleteError - returns when cant remove object. -type DeleteError struct { - Err error - Object string -} - -func (e DeleteError) Error() string { - return fmt.Sprintf("%s (%s)", e.Err, e.Object) +// ObjectVersion get "object:version" string. +func (e ObjectError) ObjectVersion() string { + return e.Object + ":" + e.Version } diff --git a/api/handler/acl.go b/api/handler/acl.go index 2131e0ca..b5b244b8 100644 --- a/api/handler/acl.go +++ b/api/handler/acl.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer" ) @@ -239,7 +240,13 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) { return } - if _, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil { + p := &layer.HeadObjectParams{ + Bucket: reqInfo.BucketName, + Object: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + } + + if _, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not get object info", reqInfo, err) return } @@ -273,7 +280,7 @@ func (h *handler) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) } } -func checkOwner(info *layer.BucketInfo, owner string) error { +func checkOwner(info *cache.BucketInfo, owner string) error { if owner == "" { return nil } diff --git a/api/handler/copy.go b/api/handler/copy.go index 0503a837..86864fe1 100644 --- a/api/handler/copy.go +++ b/api/handler/copy.go @@ -32,10 +32,11 @@ func path2BucketObject(path string) (bucket, prefix string) { func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { var ( err error - inf *layer.ObjectInfo + info *layer.ObjectInfo metadata map[string]string - reqInfo = api.GetReqInfo(r.Context()) + reqInfo = api.GetReqInfo(r.Context()) + versionID string ) src := r.Header.Get("X-Amz-Copy-Source") @@ -45,14 +46,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { // of the version ID to null. If you have enabled versioning, Amazon S3 assigns a // unique version ID value for the object. if u, err := url.Parse(src); err == nil { - // Check if versionId query param was added, if yes then check if - // its non "null" value, we should error out since we do not support - // any versions other than "null". - if vid := u.Query().Get("versionId"); vid != "" && vid != "null" { - h.logAndSendError(w, "no such version", reqInfo, errors.GetAPIError(errors.ErrNoSuchVersion)) - return - } - + versionID = u.Query().Get(api.QueryVersionID) src = u.Path } @@ -66,7 +60,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { p := &layer.HeadObjectParams{ Bucket: srcBucket, Object: srcObject, - VersionID: reqInfo.URL.Query().Get("versionId"), + VersionID: versionID, } if args.MetadataDirective == replaceMetadataDirective { @@ -85,46 +79,46 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { return } - if inf, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { + if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not find object", reqInfo, err) return } - if err = checkPreconditions(inf, args.Conditional); err != nil { + if err = checkPreconditions(info, args.Conditional); err != nil { h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed)) return } if metadata == nil { - if len(inf.ContentType) > 0 { - inf.Headers[api.ContentType] = inf.ContentType + if len(info.ContentType) > 0 { + info.Headers[api.ContentType] = info.ContentType } - metadata = inf.Headers + metadata = info.Headers } else if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 { metadata[api.ContentType] = contentType } params := &layer.CopyObjectParams{ - SrcObject: inf, + SrcObject: info, DstBucket: reqInfo.BucketName, DstObject: reqInfo.ObjectName, - SrcSize: inf.Size, + SrcSize: info.Size, Header: metadata, } additional := []zap.Field{zap.String("src_bucket_name", srcBucket), zap.String("src_object_name", srcObject)} - if inf, err = h.obj.CopyObject(r.Context(), params); err != nil { + if info, err = h.obj.CopyObject(r.Context(), params); err != nil { h.logAndSendError(w, "couldn't copy object", reqInfo, err, additional...) return - } else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: inf.Created.Format(time.RFC3339), ETag: inf.HashSum}); err != nil { + } else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: info.Created.Format(time.RFC3339), ETag: info.HashSum}); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err, additional...) return } h.log.Info("object is copied", - zap.String("bucket", inf.Bucket), - zap.String("object", inf.Name), - zap.Stringer("object_id", inf.ID())) + zap.String("bucket", info.Bucket), + zap.String("object", info.Name), + zap.Stringer("object_id", info.ID())) } func parseCopyObjectArgs(headers http.Header) (*copyObjectArgs, error) { diff --git a/api/handler/delete.go b/api/handler/delete.go index a8b56bc6..15a4b225 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -27,9 +27,10 @@ type ObjectIdentifier struct { // DeleteError structure. type DeleteError struct { - Code string - Message string - Key string + Code string + Message string + Key string + VersionID string `xml:"versionId,omitempty"` } // DeleteObjectsResponse container for multiple object deletes. @@ -47,7 +48,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { reqInfo := api.GetReqInfo(r.Context()) versionedObject := []*layer.VersionedObject{{ Name: reqInfo.ObjectName, - VersionID: reqInfo.URL.Query().Get("versionId"), + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), }} if err := h.checkBucketOwner(r, reqInfo.BucketName); err != nil { @@ -99,7 +100,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re return } - removed := make(map[string]struct{}) + removed := make(map[string]*layer.VersionedObject) toRemove := make([]*layer.VersionedObject, 0, len(requested.Objects)) for _, obj := range requested.Objects { versionedObj := &layer.VersionedObject{ @@ -107,7 +108,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re VersionID: obj.VersionID, } toRemove = append(toRemove, versionedObj) - removed[versionedObj.String()] = struct{}{} + removed[versionedObj.String()] = versionedObj } response := &DeleteObjectsResponse{ @@ -135,23 +136,26 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re h.logAndSendError(w, "could not delete objects", reqInfo, nil, additional...) for _, e := range errs { - if err, ok := e.(*errors.DeleteError); ok { + if err, ok := e.(*errors.ObjectError); ok { code := "BadRequest" - desc := err.Error() + if s3err, ok := err.Err.(errors.Error); ok { + code = s3err.Code + } response.Errors = append(response.Errors, DeleteError{ - Code: code, - Message: desc, - Key: err.Object, + Code: code, + Message: err.Error(), + Key: err.Object, + VersionID: err.Version, }) - delete(removed, err.Object) + delete(removed, err.ObjectVersion()) } } } - for key := range removed { - response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: key}) + for _, val := range removed { + response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: val.Name, VersionID: val.VersionID}) } if err := api.EncodeToResponse(w, response); err != nil { diff --git a/api/handler/get.go b/api/handler/get.go index 9a1c62f0..7572423e 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -82,7 +82,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) { func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { var ( err error - inf *layer.ObjectInfo + info *layer.ObjectInfo params *layer.RangeParams reqInfo = api.GetReqInfo(r.Context()) @@ -102,30 +102,30 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { p := &layer.HeadObjectParams{ Bucket: reqInfo.BucketName, Object: reqInfo.ObjectName, - VersionID: reqInfo.URL.Query().Get("versionId"), + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), } - if inf, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { + if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not find object", reqInfo, err) return } - if err = checkPreconditions(inf, args.Conditional); err != nil { + if err = checkPreconditions(info, args.Conditional); err != nil { h.logAndSendError(w, "precondition failed", reqInfo, err) return } - if params, err = fetchRangeHeader(r.Header, uint64(inf.Size)); err != nil { + if params, err = fetchRangeHeader(r.Header, uint64(info.Size)); err != nil { h.logAndSendError(w, "could not parse range header", reqInfo, err) return } - writeHeaders(w.Header(), inf) + writeHeaders(w.Header(), info) if params != nil { - writeRangeHeaders(w, params, inf.Size) + writeRangeHeaders(w, params, info.Size) } getParams := &layer.GetObjectParams{ - ObjectInfo: inf, + ObjectInfo: info, Writer: w, Range: params, VersionID: p.VersionID, @@ -135,17 +135,17 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { } } -func checkPreconditions(inf *layer.ObjectInfo, args *conditionalArgs) error { - if len(args.IfMatch) > 0 && args.IfMatch != inf.HashSum { +func checkPreconditions(info *layer.ObjectInfo, args *conditionalArgs) error { + if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum { return errors.GetAPIError(errors.ErrPreconditionFailed) } - if len(args.IfNoneMatch) > 0 && args.IfNoneMatch == inf.HashSum { + if len(args.IfNoneMatch) > 0 && args.IfNoneMatch == info.HashSum { return errors.GetAPIError(errors.ErrNotModified) } - if args.IfModifiedSince != nil && inf.Created.Before(*args.IfModifiedSince) { + if args.IfModifiedSince != nil && info.Created.Before(*args.IfModifiedSince) { return errors.GetAPIError(errors.ErrNotModified) } - if args.IfUnmodifiedSince != nil && inf.Created.After(*args.IfUnmodifiedSince) { + if args.IfUnmodifiedSince != nil && info.Created.After(*args.IfUnmodifiedSince) { if len(args.IfMatch) == 0 { return errors.GetAPIError(errors.ErrPreconditionFailed) } diff --git a/api/handler/head.go b/api/handler/head.go index 1f0a3031..a03c33ca 100644 --- a/api/handler/head.go +++ b/api/handler/head.go @@ -25,8 +25,8 @@ func getRangeToDetectContentType(maxSize int64) *layer.RangeParams { func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { var ( - err error - inf *layer.ObjectInfo + err error + info *layer.ObjectInfo reqInfo = api.GetReqInfo(r.Context()) ) @@ -39,30 +39,30 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { p := &layer.HeadObjectParams{ Bucket: reqInfo.BucketName, Object: reqInfo.ObjectName, - VersionID: reqInfo.URL.Query().Get("versionId"), + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), } - if inf, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { + if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not fetch object info", reqInfo, err) return } - if len(inf.ContentType) == 0 { + if len(info.ContentType) == 0 { buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) getParams := &layer.GetObjectParams{ - ObjectInfo: inf, + ObjectInfo: info, Writer: buffer, - Range: getRangeToDetectContentType(inf.Size), - VersionID: reqInfo.URL.Query().Get("versionId"), + Range: getRangeToDetectContentType(info.Size), + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), } if err = h.obj.GetObject(r.Context(), getParams); err != nil { - h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", inf.ID())) + h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID())) return } - inf.ContentType = http.DetectContentType(buffer.Bytes()) + info.ContentType = http.DetectContentType(buffer.Bytes()) } - writeHeaders(w.Header(), inf) + writeHeaders(w.Header(), info) w.WriteHeader(http.StatusOK) } diff --git a/api/handler/object_list.go b/api/handler/object_list.go index 293e6bb2..bafc62db 100644 --- a/api/handler/object_list.go +++ b/api/handler/object_list.go @@ -217,6 +217,16 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http return } + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + info, err := h.obj.ListObjectVersions(r.Context(), p) if err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) diff --git a/api/handler/put.go b/api/handler/put.go index e4a89a51..288db285 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -116,6 +116,12 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { } } + if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil { + h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err)) + } else if versioning.VersioningEnabled { + w.Header().Set(api.AmzVersionID, info.Version()) + } + w.Header().Set(api.ETag, info.HashSum) api.WriteSuccessResponseHeadersOnly(w) } diff --git a/api/handler/response.go b/api/handler/response.go index 7d344891..a8884063 100644 --- a/api/handler/response.go +++ b/api/handler/response.go @@ -167,7 +167,7 @@ type ListObjectsVersionsResponse struct { // VersioningConfiguration contains VersioningConfiguration XML representation. type VersioningConfiguration struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ VersioningConfiguration"` - Status string `xml:"Status"` + Status string `xml:"Status,omitempty"` MfaDelete string `xml:"MfaDelete,omitempty"` } diff --git a/api/handler/versioning.go b/api/handler/versioning.go index 19351a10..dbce63e9 100644 --- a/api/handler/versioning.go +++ b/api/handler/versioning.go @@ -24,6 +24,16 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ Settings: &layer.BucketSettings{VersioningEnabled: configuration.Status == "Enabled"}, } + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + if _, err := h.obj.PutBucketVersioning(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't put update versioning settings", reqInfo, err) } @@ -33,14 +43,27 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { reqInfo := api.GetReqInfo(r.Context()) + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + settings, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName) if err != nil { + if errors.IsS3Error(err, errors.ErrNoSuchBucket) { + h.logAndSendError(w, "couldn't get versioning settings", reqInfo, err) + return + } h.log.Warn("couldn't get version settings object: default version settings will be used", zap.String("request_id", reqInfo.RequestID), zap.String("method", reqInfo.API), - zap.String("object_name", reqInfo.ObjectName), + zap.String("bucket_name", reqInfo.BucketName), zap.Error(err)) - return } if err = api.EncodeToResponse(w, formVersioningConfiguration(settings)); err != nil { @@ -49,12 +72,14 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ } func formVersioningConfiguration(settings *layer.BucketSettings) *VersioningConfiguration { - res := &VersioningConfiguration{Status: "Suspended"} + res := &VersioningConfiguration{} if settings == nil { return res } if settings.VersioningEnabled { res.Status = "Enabled" + } else { + res.Status = "Suspended" } return res } diff --git a/api/headers.go b/api/headers.go index 767eca43..dfdefbdc 100644 --- a/api/headers.go +++ b/api/headers.go @@ -44,3 +44,8 @@ const ( ContainerID = "X-Container-Id" ) + +// S3 request query params. +const ( + QueryVersionID = "versionId" +) diff --git a/api/layer/container.go b/api/layer/container.go index 1e7b3b56..5f892217 100644 --- a/api/layer/container.go +++ b/api/layer/container.go @@ -117,29 +117,42 @@ func (n *layer) containerList(ctx context.Context) ([]*cache.BucketInfo, error) } func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*cid.ID, error) { + var err error + bktInfo := &cache.BucketInfo{ + Name: p.Name, + Owner: n.Owner(ctx), + Created: time.Now(), + BasicACL: p.ACL, + } cnr := container.New( container.WithPolicy(p.Policy), container.WithCustomBasicACL(p.ACL), container.WithAttribute(container.AttributeName, p.Name), - container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10))) + container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(bktInfo.Created.Unix(), 10))) cnr.SetSessionToken(p.BoxData.Gate.SessionToken) - cnr.SetOwnerID(n.Owner(ctx)) + cnr.SetOwnerID(bktInfo.Owner) - cid, err := n.pool.PutContainer(ctx, cnr) - if err != nil { - return nil, fmt.Errorf("failed to create a bucket: %w", err) - } - - if err = n.pool.WaitForContainerPresence(ctx, cid, pool.DefaultPollingParams()); err != nil { + if bktInfo.CID, err = n.pool.PutContainer(ctx, cnr); err != nil { return nil, err } - if err := n.setContainerEACLTable(ctx, cid, p.EACL); err != nil { + if err = n.pool.WaitForContainerPresence(ctx, bktInfo.CID, pool.DefaultPollingParams()); err != nil { return nil, err } - return cid, nil + if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL); err != nil { + return nil, err + } + + if err = n.bucketCache.Put(bktInfo); err != nil { + n.log.Warn("couldn't put bucket info into cache", + zap.String("bucket name", bktInfo.Name), + zap.Stringer("bucket cid", bktInfo.CID), + zap.Error(err)) + } + + return bktInfo.CID, nil } func (n *layer) setContainerEACLTable(ctx context.Context, cid *cid.ID, table *eacl.Table) error { diff --git a/api/layer/layer.go b/api/layer/layer.go index 051629ea..b49691bd 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -7,9 +7,6 @@ import ( "fmt" "io" "net/url" - "sort" - "strconv" - "strings" "time" "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" @@ -32,7 +29,7 @@ type ( log *zap.Logger listsCache ObjectsListCache objCache cache.ObjectsCache - headCache cache.HeadObjectsCache + namesCache cache.ObjectsNameCache bucketCache cache.BucketCache systemCache cache.SystemCache } @@ -56,12 +53,10 @@ type ( GetObjectParams struct { Range *RangeParams ObjectInfo *ObjectInfo - //Bucket string - //Object string - Offset int64 - Length int64 - Writer io.Writer - VersionID string + Offset int64 + Length int64 + Writer io.Writer + VersionID string } // HeadObjectParams stores object head request parameters. @@ -139,14 +134,6 @@ type ( VersionID string } - objectVersions struct { - name string - objects []*ObjectInfo - addList []string - delList []string - isSorted bool - } - // NeoFS provides basic NeoFS interface. NeoFS interface { Get(ctx context.Context, address *object.Address) (*object.Object, error) @@ -181,95 +168,6 @@ type ( } ) -func newObjectVersions(name string) *objectVersions { - return &objectVersions{name: name} -} - -func (v *objectVersions) appendVersion(oi *ObjectInfo) { - addVers := append(splitVersions(oi.Headers[versionsAddAttr]), oi.Version()) - delVers := splitVersions(oi.Headers[versionsDelAttr]) - v.objects = append(v.objects, oi) - for _, add := range addVers { - if !contains(v.addList, add) { - v.addList = append(v.addList, add) - } - } - for _, del := range delVers { - if !contains(v.delList, del) { - v.delList = append(v.delList, del) - } - } - v.isSorted = false -} - -func (v *objectVersions) sort() { - if !v.isSorted { - sortVersions(v.objects) - v.isSorted = true - } -} - -func (v *objectVersions) getLast() *ObjectInfo { - if len(v.objects) == 0 { - return nil - } - - v.sort() - existedVersions := getExistedVersions(v) - for i := len(v.objects) - 1; i >= 0; i-- { - if contains(existedVersions, v.objects[i].Version()) { - delMarkHeader := v.objects[i].Headers[versionsDeleteMarkAttr] - if delMarkHeader == "" { - return v.objects[i] - } - if delMarkHeader == delMarkFullObject { - return nil - } - } - } - - return nil -} - -func (v *objectVersions) getFiltered() []*ObjectInfo { - if len(v.objects) == 0 { - return nil - } - - v.sort() - existedVersions := getExistedVersions(v) - res := make([]*ObjectInfo, 0, len(v.objects)) - - for _, version := range v.objects { - delMark := version.Headers[versionsDeleteMarkAttr] - if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") { - res = append(res, version) - } - } - - return res -} - -func (v *objectVersions) getAddHeader() string { - return strings.Join(v.addList, ",") -} - -func (v *objectVersions) getDelHeader() string { - return strings.Join(v.delList, ",") -} - -const ( - unversionedObjectVersionID = "null" - bktVersionSettingsObject = ".s3-versioning-settings" - objectSystemAttributeName = "S3-System-name" - attrVersionsIgnore = "S3-Versions-ignore" - attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled" - versionsDelAttr = "S3-Versions-del" - versionsAddAttr = "S3-Versions-add" - versionsDeleteMarkAttr = "S3-Versions-delete-mark" - delMarkFullObject = "*" -) - func (t *VersionedObject) String() string { return t.Name + ":" + t.VersionID } @@ -283,7 +181,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client { listsCache: newListObjectsCache(config.ListObjectsLifetime), objCache: cache.New(config.Size, config.Lifetime), //todo reconsider cache params - headCache: cache.NewHeadObject(1000, time.Minute), + namesCache: cache.NewObjectsNameCache(1000, time.Minute), bucketCache: cache.NewBucketCache(150, time.Minute), systemCache: cache.NewSystemCache(1000, 5*time.Minute), } @@ -432,11 +330,11 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*Object } func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *cache.BucketInfo) (*ObjectInfo, error) { - if meta := n.systemCache.Get(bktVersionSettingsObject); meta != nil { + if meta := n.systemCache.Get(bkt.SettingsObjectKey()); meta != nil { return objInfoFromMeta(bkt, meta), nil } - oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: bktVersionSettingsObject}) + oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: bkt.SettingsObjectName()}) if err != nil { return nil, err } @@ -446,7 +344,7 @@ func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *cache.BucketInfo n.log.Error("could not fetch object head", zap.Error(err)) return nil, err } - if err = n.systemCache.Put(bktVersionSettingsObject, meta); err != nil { + if err = n.systemCache.Put(bkt.SettingsObjectKey(), meta); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } @@ -506,7 +404,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *cache.BucketInfo, obj *Ve Header: map[string]string{versionsDeleteMarkAttr: obj.VersionID}, } if len(obj.VersionID) != 0 { - id, err := n.checkVersionsExists(ctx, bkt, obj) + id, err := n.checkVersionsExist(ctx, bkt, obj) if err != nil { return err } @@ -517,41 +415,24 @@ func (n *layer) deleteObject(ctx context.Context, bkt *cache.BucketInfo, obj *Ve p.Header[versionsDeleteMarkAttr] = delMarkFullObject } if _, err = n.objectPut(ctx, bkt, p); err != nil { - return &errors.DeleteError{Err: err, Object: obj.String()} + return err } } else { ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: obj.Name}) if err != nil { - return &errors.DeleteError{Err: err, Object: obj.String()} + return err } } for _, id := range ids { if err = n.objectDelete(ctx, bkt.CID, id); err != nil { - return &errors.DeleteError{Err: err, Object: obj.String()} + return err } } return nil } -func (n *layer) checkVersionsExists(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) (*object.ID, error) { - id := object.NewID() - if err := id.Parse(obj.VersionID); err != nil { - return nil, &errors.DeleteError{Err: errors.GetAPIError(errors.ErrInvalidVersion), Object: obj.String()} - } - - versions, err := n.headVersions(ctx, bkt, obj.Name) - if err != nil { - return nil, &errors.DeleteError{Err: err, Object: obj.String()} - } - if !contains(getExistedVersions(versions), obj.VersionID) { - return nil, &errors.DeleteError{Err: errors.GetAPIError(errors.ErrInvalidVersion), Object: obj.String()} - } - - return id, nil -} - // DeleteObjects from the storage. func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error { var errs = make([]error, 0, len(objects)) @@ -561,9 +442,9 @@ func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*Ver return append(errs, err) } - for i := range objects { - if err := n.deleteObject(ctx, bkt, objects[i]); err != nil { - errs = append(errs, err) + for _, obj := range objects { + if err := n.deleteObject(ctx, bkt, obj); err != nil { + errs = append(errs, &errors.ObjectError{Err: err, Object: obj.Name, Version: obj.VersionID}) } } @@ -588,210 +469,17 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { return err } - ids, err := n.objectSearch(ctx, &findParams{cid: bucketInfo.CID}) + objects, err := n.listSortedObjectsFromNeoFS(ctx, allObjectParams{Bucket: bucketInfo}) if err != nil { return err } - if len(ids) != 0 { + if len(objects) != 0 { return errors.GetAPIError(errors.ErrBucketNotEmpty) } - return n.deleteContainer(ctx, bucketInfo.CID) -} - -func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - var versions map[string]*objectVersions - res := &ListObjectVersionsInfo{} - - bkt, err := n.GetBucketInfo(ctx, p.Bucket) - if err != nil { - return nil, err - } - - cacheKey, err := createKey(ctx, bkt.CID, listVersionsMethod, p.Prefix, p.Delimiter) - if err != nil { - return nil, err - } - - allObjects := n.listsCache.Get(cacheKey) - if allObjects == nil { - versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter) - if err != nil { - return nil, err - } - - sortedNames := make([]string, 0, len(versions)) - for k := range versions { - sortedNames = append(sortedNames, k) - } - sort.Strings(sortedNames) - - allObjects = make([]*ObjectInfo, 0, p.MaxKeys) - for _, name := range sortedNames { - allObjects = append(allObjects, versions[name].getFiltered()...) - } - - // putting to cache a copy of allObjects because allObjects can be modified further - n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) - } - - for i, obj := range allObjects { - if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker { - allObjects = allObjects[i:] - break - } - } - - res.CommonPrefixes, allObjects = triageObjects(allObjects) - - if len(allObjects) > p.MaxKeys { - res.IsTruncated = true - res.NextKeyMarker = allObjects[p.MaxKeys].Name - res.NextVersionIDMarker = allObjects[p.MaxKeys].Version() - - allObjects = allObjects[:p.MaxKeys] - res.KeyMarker = allObjects[p.MaxKeys-1].Name - res.VersionIDMarker = allObjects[p.MaxKeys-1].Version() - } - - objects := make([]*ObjectVersionInfo, len(allObjects)) - for i, obj := range allObjects { - objects[i] = &ObjectVersionInfo{Object: obj} - if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name { - objects[i].IsLatest = true - } - } - - res.Version, res.DeleteMarker = triageVersions(objects) - return res, nil -} - -func sortVersions(versions []*ObjectInfo) { - sort.Slice(versions, func(i, j int) bool { - return less(versions[i], versions[j]) - }) -} - -func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*ObjectVersionInfo) { - if len(objVersions) == 0 { - return nil, nil - } - - var resVersion []*ObjectVersionInfo - var resDelMarkVersions []*ObjectVersionInfo - - for _, version := range objVersions { - if version.Object.Headers[versionsDeleteMarkAttr] == delMarkFullObject { - resDelMarkVersions = append(resDelMarkVersions, version) - } else { - resVersion = append(resVersion, version) - } - } - - return resVersion, resDelMarkVersions -} - -func less(ov1, ov2 *ObjectInfo) bool { - if ov1.CreationEpoch == ov2.CreationEpoch { - return ov1.Version() < ov2.Version() - } - return ov1.CreationEpoch < ov2.CreationEpoch -} - -func contains(list []string, elem string) bool { - for _, item := range list { - if elem == item { - return true - } - } - return false -} - -func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) { - bucketInfo, err := n.GetBucketInfo(ctx, p.Bucket) - if err != nil { - return nil, err - } - - objectInfo, err := n.getSettingsObjectInfo(ctx, bucketInfo) - if err != nil { - n.log.Warn("couldn't get bucket version settings object, new one will be created", - zap.String("bucket_name", bucketInfo.Name), - zap.Stringer("cid", bucketInfo.CID), - zap.Error(err)) - } - - attributes := make([]*object.Attribute, 0, 3) - - filename := object.NewAttribute() - filename.SetKey(objectSystemAttributeName) - filename.SetValue(bktVersionSettingsObject) - - createdAt := object.NewAttribute() - createdAt.SetKey(object.AttributeTimestamp) - createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) - - versioningIgnore := object.NewAttribute() - versioningIgnore.SetKey(attrVersionsIgnore) - versioningIgnore.SetValue(strconv.FormatBool(true)) - - settingsVersioningEnabled := object.NewAttribute() - settingsVersioningEnabled.SetKey(attrSettingsVersioningEnabled) - settingsVersioningEnabled.SetValue(strconv.FormatBool(p.Settings.VersioningEnabled)) - - attributes = append(attributes, filename, createdAt, versioningIgnore, settingsVersioningEnabled) - - raw := object.NewRaw() - raw.SetOwnerID(bucketInfo.Owner) - raw.SetContainerID(bucketInfo.CID) - raw.SetAttributes(attributes...) - - ops := new(client.PutObjectParams).WithObject(raw.Object()) - oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) - if err != nil { - return nil, err - } - - meta, err := n.objectHead(ctx, bucketInfo.CID, oid) - if err != nil { - return nil, err - } - - if objectInfo != nil { - if err = n.objectDelete(ctx, bucketInfo.CID, objectInfo.ID()); err != nil { - return nil, err - } - } - - return objectInfoFromMeta(bucketInfo, meta, "", ""), nil -} - -func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) { - bktInfo, err := n.GetBucketInfo(ctx, bucketName) - if err != nil { - return nil, err - } - - return n.getBucketSettings(ctx, bktInfo) -} - -func (n *layer) getBucketSettings(ctx context.Context, bktInfo *cache.BucketInfo) (*BucketSettings, error) { - objInfo, err := n.getSettingsObjectInfo(ctx, bktInfo) - if err != nil { - return nil, err - } - - return objectInfoToBucketSettings(objInfo), nil -} - -func objectInfoToBucketSettings(info *ObjectInfo) *BucketSettings { - res := &BucketSettings{} - - enabled, ok := info.Headers["S3-Settings-Versioning-enabled"] - if ok { - if parsed, err := strconv.ParseBool(enabled); err == nil { - res.VersioningEnabled = parsed - } - } - return res + if err = n.deleteContainer(ctx, bucketInfo.CID); err != nil { + return err + } + n.bucketCache.Delete(bucketInfo.Name) + return nil } diff --git a/api/layer/object.go b/api/layer/object.go index 226dec23..b18c12cd 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -144,7 +144,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *cache.BucketInfo, p *PutObje r := p.Reader if len(p.Header[api.ContentType]) == 0 { - d := newDetector(p.Reader) + d := newDetector(r) if contentType, err := d.Detect(); err == nil { p.Header[api.ContentType] = contentType } @@ -271,7 +271,7 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio } func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*ObjectInfo, error) { - if address := n.headCache.Get(bkt.Name + "/" + objectName); address != nil { + if address := n.namesCache.Get(bkt.Name + "/" + objectName); address != nil { if headInfo := n.objCache.Get(address); headInfo != nil { return objInfoFromMeta(bkt, headInfo), nil } @@ -287,7 +287,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *cache.Buck return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) } - if err = n.headCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil { + if err = n.namesCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil { n.log.Warn("couldn't put obj address to head cache", zap.String("obj nice name", lastVersion.NiceName()), zap.Error(err)) @@ -487,14 +487,12 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *cache.BucketInfo continue } - if objVersions, ok := versions[oi.Name]; ok { - objVersions.appendVersion(oi) - versions[oi.Name] = objVersions - } else { - objVersion := newObjectVersions(oi.Name) - objVersion.appendVersion(oi) - versions[oi.Name] = objVersion + objVersions, ok := versions[oi.Name] + if !ok { + objVersions = newObjectVersions(oi.Name) } + objVersions.appendVersion(oi) + versions[oi.Name] = objVersions } } diff --git a/api/layer/object_list_cache.go b/api/layer/object_list_cache.go index e67ac1ae..18e562ce 100644 --- a/api/layer/object_list_cache.go +++ b/api/layer/object_list_cache.go @@ -45,6 +45,7 @@ type ( list []*ObjectInfo } cacheOptions struct { + method string key string delimiter string prefix string @@ -89,6 +90,7 @@ func createKey(ctx context.Context, cid *cid.ID, method, prefix, delimiter strin return cacheOptions{}, err } p := cacheOptions{ + method: method, key: box.Gate.AccessKey + cid.String(), delimiter: delimiter, prefix: prefix, diff --git a/api/layer/versioning.go b/api/layer/versioning.go new file mode 100644 index 00000000..20e5219a --- /dev/null +++ b/api/layer/versioning.go @@ -0,0 +1,325 @@ +package layer + +import ( + "context" + "sort" + "strconv" + "strings" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" + "github.com/nspcc-dev/neofs-s3-gw/api/errors" + "go.uber.org/zap" +) + +type objectVersions struct { + name string + objects []*ObjectInfo + addList []string + delList []string + isSorted bool +} + +const ( + unversionedObjectVersionID = "null" + objectSystemAttributeName = "S3-System-name" + attrVersionsIgnore = "S3-Versions-ignore" + attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled" + versionsDelAttr = "S3-Versions-del" + versionsAddAttr = "S3-Versions-add" + versionsDeleteMarkAttr = "S3-Versions-delete-mark" + delMarkFullObject = "*" +) + +func newObjectVersions(name string) *objectVersions { + return &objectVersions{name: name} +} + +func (v *objectVersions) appendVersion(oi *ObjectInfo) { + addVers := append(splitVersions(oi.Headers[versionsAddAttr]), oi.Version()) + delVers := splitVersions(oi.Headers[versionsDelAttr]) + v.objects = append(v.objects, oi) + for _, add := range addVers { + if !contains(v.addList, add) { + v.addList = append(v.addList, add) + } + } + for _, del := range delVers { + if !contains(v.delList, del) { + v.delList = append(v.delList, del) + } + } + v.isSorted = false +} + +func (v *objectVersions) sort() { + if !v.isSorted { + sort.Slice(v.objects, func(i, j int) bool { + return less(v.objects[i], v.objects[j]) + }) + v.isSorted = true + } +} + +func (v *objectVersions) getLast() *ObjectInfo { + if len(v.objects) == 0 { + return nil + } + + v.sort() + existedVersions := getExistedVersions(v) + for i := len(v.objects) - 1; i >= 0; i-- { + if contains(existedVersions, v.objects[i].Version()) { + delMarkHeader := v.objects[i].Headers[versionsDeleteMarkAttr] + if delMarkHeader == "" { + return v.objects[i] + } + if delMarkHeader == delMarkFullObject { + return nil + } + } + } + + return nil +} + +func (v *objectVersions) getFiltered() []*ObjectInfo { + if len(v.objects) == 0 { + return nil + } + + v.sort() + existedVersions := getExistedVersions(v) + res := make([]*ObjectInfo, 0, len(v.objects)) + + for _, version := range v.objects { + delMark := version.Headers[versionsDeleteMarkAttr] + if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") { + res = append(res, version) + } + } + + return res +} + +func (v *objectVersions) getAddHeader() string { + return strings.Join(v.addList, ",") +} + +func (v *objectVersions) getDelHeader() string { + return strings.Join(v.delList, ",") +} + +func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) { + bucketInfo, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { + return nil, err + } + + objectInfo, err := n.getSettingsObjectInfo(ctx, bucketInfo) + if err != nil { + n.log.Warn("couldn't get bucket version settings object, new one will be created", + zap.String("bucket_name", bucketInfo.Name), + zap.Stringer("cid", bucketInfo.CID), + zap.Error(err)) + } + + attributes := make([]*object.Attribute, 0, 3) + + filename := object.NewAttribute() + filename.SetKey(objectSystemAttributeName) + filename.SetValue(bucketInfo.SettingsObjectName()) + + createdAt := object.NewAttribute() + createdAt.SetKey(object.AttributeTimestamp) + createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) + + versioningIgnore := object.NewAttribute() + versioningIgnore.SetKey(attrVersionsIgnore) + versioningIgnore.SetValue(strconv.FormatBool(true)) + + settingsVersioningEnabled := object.NewAttribute() + settingsVersioningEnabled.SetKey(attrSettingsVersioningEnabled) + settingsVersioningEnabled.SetValue(strconv.FormatBool(p.Settings.VersioningEnabled)) + + attributes = append(attributes, filename, createdAt, versioningIgnore, settingsVersioningEnabled) + + raw := object.NewRaw() + raw.SetOwnerID(bucketInfo.Owner) + raw.SetContainerID(bucketInfo.CID) + raw.SetAttributes(attributes...) + + ops := new(client.PutObjectParams).WithObject(raw.Object()) + oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) + if err != nil { + return nil, err + } + + meta, err := n.objectHead(ctx, bucketInfo.CID, oid) + if err != nil { + return nil, err + } + + if err = n.systemCache.Put(bucketInfo.SettingsObjectKey(), meta); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + if objectInfo != nil { + if err = n.objectDelete(ctx, bucketInfo.CID, objectInfo.ID()); err != nil { + return nil, err + } + } + + return objectInfoFromMeta(bucketInfo, meta, "", ""), nil +} + +func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) { + bktInfo, err := n.GetBucketInfo(ctx, bucketName) + if err != nil { + return nil, err + } + + return n.getBucketSettings(ctx, bktInfo) +} + +func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { + var versions map[string]*objectVersions + res := &ListObjectVersionsInfo{} + + bkt, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { + return nil, err + } + + cacheKey, err := createKey(ctx, bkt.CID, listVersionsMethod, p.Prefix, p.Delimiter) + if err != nil { + return nil, err + } + + allObjects := n.listsCache.Get(cacheKey) + if allObjects == nil { + versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter) + if err != nil { + return nil, err + } + + sortedNames := make([]string, 0, len(versions)) + for k := range versions { + sortedNames = append(sortedNames, k) + } + sort.Strings(sortedNames) + + allObjects = make([]*ObjectInfo, 0, p.MaxKeys) + for _, name := range sortedNames { + allObjects = append(allObjects, versions[name].getFiltered()...) + } + + // putting to cache a copy of allObjects because allObjects can be modified further + n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) + } + + for i, obj := range allObjects { + if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker { + allObjects = allObjects[i:] + break + } + } + + res.CommonPrefixes, allObjects = triageObjects(allObjects) + + if len(allObjects) > p.MaxKeys { + res.IsTruncated = true + res.NextKeyMarker = allObjects[p.MaxKeys].Name + res.NextVersionIDMarker = allObjects[p.MaxKeys].Version() + + allObjects = allObjects[:p.MaxKeys] + res.KeyMarker = allObjects[p.MaxKeys-1].Name + res.VersionIDMarker = allObjects[p.MaxKeys-1].Version() + } + + objects := make([]*ObjectVersionInfo, len(allObjects)) + for i, obj := range allObjects { + objects[i] = &ObjectVersionInfo{Object: obj} + if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name { + objects[i].IsLatest = true + } + } + + res.Version, res.DeleteMarker = triageVersions(objects) + return res, nil +} + +func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*ObjectVersionInfo) { + if len(objVersions) == 0 { + return nil, nil + } + + var resVersion []*ObjectVersionInfo + var resDelMarkVersions []*ObjectVersionInfo + + for _, version := range objVersions { + if version.Object.Headers[versionsDeleteMarkAttr] == delMarkFullObject { + resDelMarkVersions = append(resDelMarkVersions, version) + } else { + resVersion = append(resVersion, version) + } + } + + return resVersion, resDelMarkVersions +} + +func less(ov1, ov2 *ObjectInfo) bool { + if ov1.CreationEpoch == ov2.CreationEpoch { + return ov1.Version() < ov2.Version() + } + return ov1.CreationEpoch < ov2.CreationEpoch +} + +func contains(list []string, elem string) bool { + for _, item := range list { + if elem == item { + return true + } + } + return false +} + +func (n *layer) getBucketSettings(ctx context.Context, bktInfo *cache.BucketInfo) (*BucketSettings, error) { + objInfo, err := n.getSettingsObjectInfo(ctx, bktInfo) + if err != nil { + return nil, err + } + + return objectInfoToBucketSettings(objInfo), nil +} + +func objectInfoToBucketSettings(info *ObjectInfo) *BucketSettings { + res := &BucketSettings{} + + enabled, ok := info.Headers[attrSettingsVersioningEnabled] + if ok { + if parsed, err := strconv.ParseBool(enabled); err == nil { + res.VersioningEnabled = parsed + } + } + return res +} + +func (n *layer) checkVersionsExist(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) (*object.ID, error) { + id := object.NewID() + if err := id.Parse(obj.VersionID); err != nil { + return nil, errors.GetAPIError(errors.ErrInvalidVersion) + } + + versions, err := n.headVersions(ctx, bkt, obj.Name) + if err != nil { + return nil, err + } + if !contains(getExistedVersions(versions), obj.VersionID) { + return nil, errors.GetAPIError(errors.ErrInvalidVersion) + } + + return id, nil +} diff --git a/docs/aws_s3_compat.md b/docs/aws_s3_compat.md index bb1eb533..647f8ce4 100644 --- a/docs/aws_s3_compat.md +++ b/docs/aws_s3_compat.md @@ -226,8 +226,8 @@ See also `GetObject` and other method parameters. | | Method | Comments | |----|---------------------|----------| -| 🔴 | GetBucketVersioning | | -| 🔴 | PutBucketVersioning | | +| 🟢 | GetBucketVersioning | | +| 🟢 | PutBucketVersioning | | ## Website