diff --git a/api/cache/buckets.go b/api/cache/buckets.go new file mode 100644 index 000000000..667e36535 --- /dev/null +++ b/api/cache/buckets.go @@ -0,0 +1,77 @@ +package cache + +import ( + "time" + + "github.com/bluele/gcache" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" +) + +type ( + // BucketCache provides interface for lru cache for objects. + BucketCache interface { + Get(key string) *BucketInfo + Put(bkt *BucketInfo) error + Delete(key string) bool + } + + // BucketInfo stores basic bucket data. + BucketInfo struct { + Name string + CID *cid.ID + Owner *owner.ID + Created time.Time + BasicACL uint32 + } + + // GetBucketCache contains cache with objects and lifetime of cache entries. + GetBucketCache struct { + cache gcache.Cache + lifetime time.Duration + } +) + +// NewBucketCache creates an object of BucketCache. +func NewBucketCache(cacheSize int, lifetime time.Duration) *GetBucketCache { + gc := gcache.New(cacheSize).LRU().Build() + + return &GetBucketCache{cache: gc, lifetime: lifetime} +} + +// Get returns cached object. +func (o *GetBucketCache) Get(key string) *BucketInfo { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*BucketInfo) + if !ok { + return nil + } + + return result +} + +// Put puts an object to cache. +func (o *GetBucketCache) Put(bkt *BucketInfo) error { + return o.cache.SetWithExpire(bkt.Name, bkt, o.lifetime) +} + +// Delete deletes an object from cache. +func (o *GetBucketCache) Delete(key string) bool { + return o.cache.Remove(key) +} + +const bktVersionSettingsObject = ".s3-versioning-settings" + +// SettingsObjectName is system name for bucket settings file. +func (b *BucketInfo) SettingsObjectName() string { + return bktVersionSettingsObject +} + +// SettingsObjectKey is key to use in SystemCache. +func (b *BucketInfo) SettingsObjectKey() string { + return b.Name + bktVersionSettingsObject +} diff --git a/api/cache/names.go b/api/cache/names.go new file mode 100644 index 000000000..afb4b5776 --- /dev/null +++ b/api/cache/names.go @@ -0,0 +1,57 @@ +package cache + +import ( + "time" + + "github.com/bluele/gcache" + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +// ObjectsNameCache provides interface for lru cache for objects. +// This cache contains mapping nice name to object addresses. +// Key is bucketName+objectName. +type ObjectsNameCache interface { + Get(key string) *object.Address + Put(key string, address *object.Address) error + Delete(key string) bool +} + +type ( + // NameCache contains cache with objects and lifetime of cache entries. + NameCache struct { + cache gcache.Cache + lifetime time.Duration + } +) + +// NewObjectsNameCache creates an object of ObjectsNameCache. +func NewObjectsNameCache(cacheSize int, lifetime time.Duration) *NameCache { + gc := gcache.New(cacheSize).LRU().Build() + + return &NameCache{cache: gc, lifetime: lifetime} +} + +// Get returns cached object. +func (o *NameCache) Get(key string) *object.Address { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*object.Address) + if !ok { + return nil + } + + return result +} + +// Put puts an object to cache. +func (o *NameCache) Put(key string, address *object.Address) error { + return o.cache.SetWithExpire(key, address, o.lifetime) +} + +// Delete deletes an object from cache. +func (o *NameCache) Delete(key string) bool { + return o.cache.Remove(key) +} diff --git a/api/cache/object_cache.go b/api/cache/object_cache.go index f7bcc244a..708fb696c 100644 --- a/api/cache/object_cache.go +++ b/api/cache/object_cache.go @@ -10,7 +10,7 @@ import ( // ObjectsCache provides interface for lru cache for objects. type ObjectsCache interface { Get(address *object.Address) *object.Object - Put(address *object.Address, obj object.Object) error + Put(obj object.Object) error Delete(address *object.Address) bool } @@ -52,8 +52,8 @@ func (o *ObjectHeadersCache) Get(address *object.Address) *object.Object { } // Put puts an object to cache. -func (o *ObjectHeadersCache) Put(address *object.Address, obj object.Object) error { - return o.cache.SetWithExpire(address.String(), obj, o.lifetime) +func (o *ObjectHeadersCache) Put(obj object.Object) error { + return o.cache.SetWithExpire(obj.ContainerID().String()+"/"+obj.ID().String(), obj, o.lifetime) } // Delete deletes an object from cache. diff --git a/api/cache/object_cache_test.go b/api/cache/object_cache_test.go index b9ee92946..40dfd87e6 100644 --- a/api/cache/object_cache_test.go +++ b/api/cache/object_cache_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/object" objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test" "github.com/stretchr/testify/require" ) @@ -14,23 +15,23 @@ const ( ) func TestCache(t *testing.T) { - var ( - address = objecttest.Address() - object = objecttest.Object() - ) + obj := objecttest.Object() + address := object.NewAddress() + address.SetContainerID(obj.ContainerID()) + address.SetObjectID(obj.ID()) t.Run("check get", func(t *testing.T) { cache := New(cachesize, lifetime) - err := cache.Put(address, *object) + err := cache.Put(*obj) require.NoError(t, err) actual := cache.Get(address) - require.Equal(t, object, actual) + require.Equal(t, obj, actual) }) t.Run("check delete", func(t *testing.T) { cache := New(cachesize, lifetime) - err := cache.Put(address, *object) + err := cache.Put(*obj) require.NoError(t, err) cache.Delete(address) diff --git a/api/cache/system.go b/api/cache/system.go new file mode 100644 index 000000000..7ceec0105 --- /dev/null +++ b/api/cache/system.go @@ -0,0 +1,57 @@ +package cache + +import ( + "time" + + "github.com/bluele/gcache" + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type ( + // SystemCache provides interface for lru cache for objects. + // This cache contains "system" objects (bucket versioning settings, tagging object etc.). + // Key is bucketName+systemFileName. + SystemCache interface { + Get(key string) *object.Object + Put(key string, obj *object.Object) error + Delete(key string) bool + } + + // SysCache contains cache with objects and lifetime of cache entries. + SysCache struct { + cache gcache.Cache + lifetime time.Duration + } +) + +// NewSystemCache creates an object of SystemCache. +func NewSystemCache(cacheSize int, lifetime time.Duration) *SysCache { + gc := gcache.New(cacheSize).LRU().Build() + + return &SysCache{cache: gc, lifetime: lifetime} +} + +// Get returns cached object. +func (o *SysCache) Get(key string) *object.Object { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*object.Object) + if !ok { + return nil + } + + return result +} + +// Put puts an object to cache. +func (o *SysCache) Put(key string, obj *object.Object) error { + return o.cache.SetWithExpire(key, obj, o.lifetime) +} + +// Delete deletes an object from cache. +func (o *SysCache) Delete(key string) bool { + return o.cache.Remove(key) +} diff --git a/api/errors/errors.go b/api/errors/errors.go index 24424a6bc..f9c8aadb5 100644 --- a/api/errors/errors.go +++ b/api/errors/errors.go @@ -66,6 +66,7 @@ const ( ErrNoSuchKey ErrNoSuchUpload ErrNoSuchVersion + ErrInvalidVersion ErrNotImplemented ErrPreconditionFailed ErrNotModified @@ -529,6 +530,12 @@ var errorCodes = errorCodeMap{ Description: "Indicates that the version ID specified in the request does not match an existing version.", HTTPStatusCode: http.StatusNotFound, }, + ErrInvalidVersion: { + ErrCode: ErrInvalidVersion, + Code: "InvalidArgument", + Description: "Invalid version id specified", + HTTPStatusCode: http.StatusBadRequest, + }, ErrNotImplemented: { ErrCode: ErrNotImplemented, Code: "NotImplemented", @@ -1917,26 +1924,18 @@ func GetAPIError(code ErrorCode) Error { return errorCodes.toAPIErr(ErrInternalError) } -// GenericError - generic object layer error. -type GenericError struct { - Bucket string - Object string +// ObjectError - error that linked to specific object. +type ObjectError struct { + Err error + Object string + Version string } -// ObjectAlreadyExists object already exists. -// This type should be removed when s3-gw will support versioning. -type ObjectAlreadyExists GenericError - -func (e ObjectAlreadyExists) Error() string { - return "Object: " + e.Bucket + "#" + e.Object + " already exists" +func (e ObjectError) Error() string { + return fmt.Sprintf("%s (%s:%s)", e.Err, e.Object, e.Version) } -// DeleteError - returns when cant remove object. -type DeleteError struct { - Err error - Object string -} - -func (e DeleteError) Error() string { - return fmt.Sprintf("%s (%s)", e.Err, e.Object) +// ObjectVersion get "object:version" string. +func (e ObjectError) ObjectVersion() string { + return e.Object + ":" + e.Version } diff --git a/api/handler/acl.go b/api/handler/acl.go index 2131e0ca4..b5b244b80 100644 --- a/api/handler/acl.go +++ b/api/handler/acl.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer" ) @@ -239,7 +240,13 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) { return } - if _, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil { + p := &layer.HeadObjectParams{ + Bucket: reqInfo.BucketName, + Object: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + } + + if _, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not get object info", reqInfo, err) return } @@ -273,7 +280,7 @@ func (h *handler) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) } } -func checkOwner(info *layer.BucketInfo, owner string) error { +func checkOwner(info *cache.BucketInfo, owner string) error { if owner == "" { return nil } diff --git a/api/handler/copy.go b/api/handler/copy.go index 5ebf36eb5..86864fe16 100644 --- a/api/handler/copy.go +++ b/api/handler/copy.go @@ -32,10 +32,11 @@ func path2BucketObject(path string) (bucket, prefix string) { func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { var ( err error - inf *layer.ObjectInfo + info *layer.ObjectInfo metadata map[string]string - reqInfo = api.GetReqInfo(r.Context()) + reqInfo = api.GetReqInfo(r.Context()) + versionID string ) src := r.Header.Get("X-Amz-Copy-Source") @@ -45,14 +46,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { // of the version ID to null. If you have enabled versioning, Amazon S3 assigns a // unique version ID value for the object. if u, err := url.Parse(src); err == nil { - // Check if versionId query param was added, if yes then check if - // its non "null" value, we should error out since we do not support - // any versions other than "null". - if vid := u.Query().Get("versionId"); vid != "" && vid != "null" { - h.logAndSendError(w, "no such version", reqInfo, errors.GetAPIError(errors.ErrNoSuchVersion)) - return - } - + versionID = u.Query().Get(api.QueryVersionID) src = u.Path } @@ -63,6 +57,11 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "could not parse request params", reqInfo, err) return } + p := &layer.HeadObjectParams{ + Bucket: srcBucket, + Object: srcObject, + VersionID: versionID, + } if args.MetadataDirective == replaceMetadataDirective { metadata = parseMetadata(r) @@ -80,47 +79,46 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { return } - if inf, err = h.obj.GetObjectInfo(r.Context(), srcBucket, srcObject); err != nil { + if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not find object", reqInfo, err) return } - if err = checkPreconditions(inf, args.Conditional); err != nil { + if err = checkPreconditions(info, args.Conditional); err != nil { h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed)) return } if metadata == nil { - if len(inf.ContentType) > 0 { - inf.Headers[api.ContentType] = inf.ContentType + if len(info.ContentType) > 0 { + info.Headers[api.ContentType] = info.ContentType } - metadata = inf.Headers + metadata = info.Headers } else if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 { metadata[api.ContentType] = contentType } params := &layer.CopyObjectParams{ - SrcBucket: srcBucket, + SrcObject: info, DstBucket: reqInfo.BucketName, - SrcObject: srcObject, DstObject: reqInfo.ObjectName, - SrcSize: inf.Size, + SrcSize: info.Size, Header: metadata, } additional := []zap.Field{zap.String("src_bucket_name", srcBucket), zap.String("src_object_name", srcObject)} - if inf, err = h.obj.CopyObject(r.Context(), params); err != nil { + if info, err = h.obj.CopyObject(r.Context(), params); err != nil { h.logAndSendError(w, "couldn't copy object", reqInfo, err, additional...) return - } else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: inf.Created.Format(time.RFC3339), ETag: inf.HashSum}); err != nil { + } else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: info.Created.Format(time.RFC3339), ETag: info.HashSum}); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err, additional...) return } h.log.Info("object is copied", - zap.String("bucket", inf.Bucket), - zap.String("object", inf.Name), - zap.Stringer("object_id", inf.ID())) + zap.String("bucket", info.Bucket), + zap.String("object", info.Name), + zap.Stringer("object_id", info.ID())) } func parseCopyObjectArgs(headers http.Header) (*copyObjectArgs, error) { diff --git a/api/handler/delete.go b/api/handler/delete.go index 742a7bbbd..15a4b225c 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // DeleteObjectsRequest - xml carrying the object key names which needs to be deleted. @@ -21,13 +22,15 @@ type DeleteObjectsRequest struct { // ObjectIdentifier carries key name for the object to delete. type ObjectIdentifier struct { ObjectName string `xml:"Key"` + VersionID string `xml:"VersionId,omitempty"` } // DeleteError structure. type DeleteError struct { - Code string - Message string - Key string + Code string + Message string + Key string + VersionID string `xml:"versionId,omitempty"` } // DeleteObjectsResponse container for multiple object deletes. @@ -43,18 +46,22 @@ type DeleteObjectsResponse struct { func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { reqInfo := api.GetReqInfo(r.Context()) + versionedObject := []*layer.VersionedObject{{ + Name: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + }} if err := h.checkBucketOwner(r, reqInfo.BucketName); err != nil { h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) return } - if err := h.obj.DeleteObject(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil { + if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, versionedObject); len(errs) != 0 && errs[0] != nil { h.log.Error("could not delete object", zap.String("request_id", reqInfo.RequestID), zap.String("bucket_name", reqInfo.BucketName), zap.String("object_name", reqInfo.ObjectName), - zap.Error(err)) + zap.Error(errs[0])) // Ignore delete errors: @@ -93,11 +100,15 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re return } - removed := make(map[string]struct{}) - toRemove := make([]string, 0, len(requested.Objects)) + removed := make(map[string]*layer.VersionedObject) + toRemove := make([]*layer.VersionedObject, 0, len(requested.Objects)) for _, obj := range requested.Objects { - removed[obj.ObjectName] = struct{}{} - toRemove = append(toRemove, obj.ObjectName) + versionedObj := &layer.VersionedObject{ + Name: obj.ObjectName, + VersionID: obj.VersionID, + } + toRemove = append(toRemove, versionedObj) + removed[versionedObj.String()] = versionedObj } response := &DeleteObjectsResponse{ @@ -110,35 +121,45 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re return } + marshaler := zapcore.ArrayMarshalerFunc(func(encoder zapcore.ArrayEncoder) error { + for _, obj := range toRemove { + encoder.AppendString(obj.String()) + } + return nil + }) + if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, toRemove); errs != nil && !requested.Quiet { additional := []zap.Field{ - zap.Strings("objects_name", toRemove), + zap.Array("objects", marshaler), zap.Errors("errors", errs), } h.logAndSendError(w, "could not delete objects", reqInfo, nil, additional...) for _, e := range errs { - if err, ok := e.(*errors.DeleteError); ok { + if err, ok := e.(*errors.ObjectError); ok { code := "BadRequest" - desc := err.Error() + if s3err, ok := err.Err.(errors.Error); ok { + code = s3err.Code + } response.Errors = append(response.Errors, DeleteError{ - Code: code, - Message: desc, - Key: err.Object, + Code: code, + Message: err.Error(), + Key: err.Object, + VersionID: err.Version, }) - delete(removed, err.Object) + delete(removed, err.ObjectVersion()) } } } - for key := range removed { - response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: key}) + for _, val := range removed { + response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: val.Name, VersionID: val.VersionID}) } if err := api.EncodeToResponse(w, response); err != nil { - h.logAndSendError(w, "could not write response", reqInfo, err, zap.Strings("objects_name", toRemove)) + h.logAndSendError(w, "could not write response", reqInfo, err, zap.Array("objects", marshaler)) return } } diff --git a/api/handler/get.go b/api/handler/get.go index 2233f28f9..7572423e5 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -72,6 +72,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) { h.Set(api.LastModified, info.Created.UTC().Format(http.TimeFormat)) h.Set(api.ContentLength, strconv.FormatInt(info.Size, 10)) h.Set(api.ETag, info.HashSum) + h.Set(api.AmzVersionID, info.ID().String()) for key, val := range info.Headers { h[api.MetadataPrefix+key] = []string{val} @@ -81,7 +82,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) { func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { var ( err error - inf *layer.ObjectInfo + info *layer.ObjectInfo params *layer.RangeParams reqInfo = api.GetReqInfo(r.Context()) @@ -98,47 +99,53 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } - if inf, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil { + p := &layer.HeadObjectParams{ + Bucket: reqInfo.BucketName, + Object: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + } + + if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not find object", reqInfo, err) return } - if err = checkPreconditions(inf, args.Conditional); err != nil { + if err = checkPreconditions(info, args.Conditional); err != nil { h.logAndSendError(w, "precondition failed", reqInfo, err) return } - if params, err = fetchRangeHeader(r.Header, uint64(inf.Size)); err != nil { + if params, err = fetchRangeHeader(r.Header, uint64(info.Size)); err != nil { h.logAndSendError(w, "could not parse range header", reqInfo, err) return } - writeHeaders(w.Header(), inf) + writeHeaders(w.Header(), info) if params != nil { - writeRangeHeaders(w, params, inf.Size) + writeRangeHeaders(w, params, info.Size) } getParams := &layer.GetObjectParams{ - Bucket: inf.Bucket, - Object: inf.Name, - Writer: w, - Range: params, + ObjectInfo: info, + Writer: w, + Range: params, + VersionID: p.VersionID, } if err = h.obj.GetObject(r.Context(), getParams); err != nil { h.logAndSendError(w, "could not get object", reqInfo, err) } } -func checkPreconditions(inf *layer.ObjectInfo, args *conditionalArgs) error { - if len(args.IfMatch) > 0 && args.IfMatch != inf.HashSum { +func checkPreconditions(info *layer.ObjectInfo, args *conditionalArgs) error { + if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum { return errors.GetAPIError(errors.ErrPreconditionFailed) } - if len(args.IfNoneMatch) > 0 && args.IfNoneMatch == inf.HashSum { + if len(args.IfNoneMatch) > 0 && args.IfNoneMatch == info.HashSum { return errors.GetAPIError(errors.ErrNotModified) } - if args.IfModifiedSince != nil && inf.Created.Before(*args.IfModifiedSince) { + if args.IfModifiedSince != nil && info.Created.Before(*args.IfModifiedSince) { return errors.GetAPIError(errors.ErrNotModified) } - if args.IfUnmodifiedSince != nil && inf.Created.After(*args.IfUnmodifiedSince) { + if args.IfUnmodifiedSince != nil && info.Created.After(*args.IfUnmodifiedSince) { if len(args.IfMatch) == 0 { return errors.GetAPIError(errors.ErrPreconditionFailed) } diff --git a/api/handler/head.go b/api/handler/head.go index 128a2836d..a03c33ca8 100644 --- a/api/handler/head.go +++ b/api/handler/head.go @@ -25,8 +25,8 @@ func getRangeToDetectContentType(maxSize int64) *layer.RangeParams { func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { var ( - err error - inf *layer.ObjectInfo + err error + info *layer.ObjectInfo reqInfo = api.GetReqInfo(r.Context()) ) @@ -36,23 +36,33 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { return } - if inf, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil { + p := &layer.HeadObjectParams{ + Bucket: reqInfo.BucketName, + Object: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + } + + if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil { h.logAndSendError(w, "could not fetch object info", reqInfo, err) return } - buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) - getParams := &layer.GetObjectParams{ - Bucket: inf.Bucket, - Object: inf.Name, - Writer: buffer, - Range: getRangeToDetectContentType(inf.Size), + + if len(info.ContentType) == 0 { + buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) + getParams := &layer.GetObjectParams{ + ObjectInfo: info, + Writer: buffer, + Range: getRangeToDetectContentType(info.Size), + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + } + if err = h.obj.GetObject(r.Context(), getParams); err != nil { + h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID())) + return + } + info.ContentType = http.DetectContentType(buffer.Bytes()) } - if err = h.obj.GetObject(r.Context(), getParams); err != nil { - h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", inf.ID())) - return - } - inf.ContentType = http.DetectContentType(buffer.Bytes()) - writeHeaders(w.Header(), inf) + + writeHeaders(w.Header(), info) w.WriteHeader(http.StatusOK) } diff --git a/api/handler/list.go b/api/handler/list.go index f226d5dd7..3fe83a0ac 100644 --- a/api/handler/list.go +++ b/api/handler/list.go @@ -9,13 +9,6 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api" ) -// VersioningConfiguration contains VersioningConfiguration XML representation. -type VersioningConfiguration struct { - XMLName xml.Name `xml:"VersioningConfiguration"` - Text string `xml:",chardata"` - Xmlns string `xml:"xmlns,attr"` -} - // ListMultipartUploadsResult contains ListMultipartUploadsResult XML representation. type ListMultipartUploadsResult struct { XMLName xml.Name `xml:"ListMultipartUploadsResult"` @@ -62,20 +55,6 @@ func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { } } -// GetBucketVersioningHandler implements bucket versioning getter handler. -func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { - var ( - reqInfo = api.GetReqInfo(r.Context()) - res = new(VersioningConfiguration) - ) - - res.Xmlns = "http://s3.amazonaws.com/doc/2006-03-01/" - - if err := api.EncodeToResponse(w, res); err != nil { - h.logAndSendError(w, "something went wrong", reqInfo, err) - } -} - // ListMultipartUploadsHandler implements multipart uploads listing handler. func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) { var ( diff --git a/api/handler/not_support.go b/api/handler/not_support.go index 6556eaf62..019fe5b42 100644 --- a/api/handler/not_support.go +++ b/api/handler/not_support.go @@ -23,10 +23,6 @@ func (h *handler) PutBucketTaggingHandler(w http.ResponseWriter, r *http.Request h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported)) } -func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { - h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported)) -} - func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported)) } diff --git a/api/handler/object_list.go b/api/handler/object_list.go index ef8f0dd4e..bafc62dbe 100644 --- a/api/handler/object_list.go +++ b/api/handler/object_list.go @@ -217,6 +217,16 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http return } + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + info, err := h.obj.ListObjectVersions(r.Context(), p) if err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) @@ -263,7 +273,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck } for _, prefix := range info.CommonPrefixes { - res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: *prefix}) + res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: prefix}) } for _, ver := range info.Version { @@ -276,7 +286,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck DisplayName: ver.Object.Owner.String(), }, Size: ver.Object.Size, - VersionID: ver.VersionID, + VersionID: ver.Object.Version(), ETag: ver.Object.HashSum, }) } @@ -284,13 +294,13 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck for _, del := range info.DeleteMarker { res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{ IsLatest: del.IsLatest, - Key: del.Key, - LastModified: del.LastModified, + Key: del.Object.Name, + LastModified: del.Object.Created.Format(time.RFC3339), Owner: Owner{ - ID: del.Owner.String(), - DisplayName: del.Owner.String(), + ID: del.Object.Owner.String(), + DisplayName: del.Object.Owner.String(), }, - VersionID: del.VersionID, + VersionID: del.Object.Version(), }) } diff --git a/api/handler/put.go b/api/handler/put.go index 368c9e642..593e4da35 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -114,6 +114,12 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { } } + if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil { + h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err)) + } else if versioning.VersioningEnabled { + w.Header().Set(api.AmzVersionID, info.Version()) + } + w.Header().Set(api.ETag, info.HashSum) api.WriteSuccessResponseHeadersOnly(w) } diff --git a/api/handler/response.go b/api/handler/response.go index 90f1fdabd..a8884063f 100644 --- a/api/handler/response.go +++ b/api/handler/response.go @@ -164,6 +164,13 @@ type ListObjectsVersionsResponse struct { CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"` } +// VersioningConfiguration contains VersioningConfiguration XML representation. +type VersioningConfiguration struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ VersioningConfiguration"` + Status string `xml:"Status,omitempty"` + MfaDelete string `xml:"MfaDelete,omitempty"` +} + // MarshalXML - StringMap marshals into XML. func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error { tokens := []xml.Token{start} diff --git a/api/handler/versioning.go b/api/handler/versioning.go new file mode 100644 index 000000000..dbce63e91 --- /dev/null +++ b/api/handler/versioning.go @@ -0,0 +1,85 @@ +package handler + +import ( + "encoding/xml" + "net/http" + + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/errors" + "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "go.uber.org/zap" +) + +func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { + reqInfo := api.GetReqInfo(r.Context()) + + configuration := new(VersioningConfiguration) + if err := xml.NewDecoder(r.Body).Decode(configuration); err != nil { + h.logAndSendError(w, "couldn't decode versioning configuration", reqInfo, errors.GetAPIError(errors.ErrIllegalVersioningConfigurationException)) + return + } + + p := &layer.PutVersioningParams{ + Bucket: reqInfo.BucketName, + Settings: &layer.BucketSettings{VersioningEnabled: configuration.Status == "Enabled"}, + } + + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + + if _, err := h.obj.PutBucketVersioning(r.Context(), p); err != nil { + h.logAndSendError(w, "couldn't put update versioning settings", reqInfo, err) + } +} + +// GetBucketVersioningHandler implements bucket versioning getter handler. +func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { + reqInfo := api.GetReqInfo(r.Context()) + + bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName) + if err != nil { + h.logAndSendError(w, "could not get bucket info", reqInfo, err) + return + } + if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil { + h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) + return + } + + settings, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName) + if err != nil { + if errors.IsS3Error(err, errors.ErrNoSuchBucket) { + h.logAndSendError(w, "couldn't get versioning settings", reqInfo, err) + return + } + h.log.Warn("couldn't get version settings object: default version settings will be used", + zap.String("request_id", reqInfo.RequestID), + zap.String("method", reqInfo.API), + zap.String("bucket_name", reqInfo.BucketName), + zap.Error(err)) + } + + if err = api.EncodeToResponse(w, formVersioningConfiguration(settings)); err != nil { + h.logAndSendError(w, "something went wrong", reqInfo, err) + } +} + +func formVersioningConfiguration(settings *layer.BucketSettings) *VersioningConfiguration { + res := &VersioningConfiguration{} + if settings == nil { + return res + } + if settings.VersioningEnabled { + res.Status = "Enabled" + } else { + res.Status = "Suspended" + } + return res +} diff --git a/api/headers.go b/api/headers.go index 0eaef4fc9..dfdefbdc4 100644 --- a/api/headers.go +++ b/api/headers.go @@ -4,6 +4,7 @@ package api const ( MetadataPrefix = "X-Amz-Meta-" AmzMetadataDirective = "X-Amz-Metadata-Directive" + AmzVersionID = "X-Amz-Version-Id" LastModified = "Last-Modified" Date = "Date" @@ -43,3 +44,8 @@ const ( ContainerID = "X-Container-Id" ) + +// S3 request query params. +const ( + QueryVersionID = "versionId" +) diff --git a/api/layer/container.go b/api/layer/container.go index 3bf7dc0e3..5f8922172 100644 --- a/api/layer/container.go +++ b/api/layer/container.go @@ -11,37 +11,29 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/container" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-sdk-go/pkg/pool" "go.uber.org/zap" ) type ( - // BucketInfo stores basic bucket data. - BucketInfo struct { - Name string - CID *cid.ID - Owner *owner.ID - Created time.Time - BasicACL uint32 - } // BucketACL extends BucketInfo by eacl.Table. BucketACL struct { - Info *BucketInfo + Info *cache.BucketInfo EACL *eacl.Table } ) -func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, error) { +func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*cache.BucketInfo, error) { var ( err error res *container.Container rid = api.GetRequestID(ctx) bearerOpt = n.BearerOpt(ctx) - info = &BucketInfo{ + info = &cache.BucketInfo{ CID: cid, Name: cid.String(), } @@ -82,10 +74,17 @@ func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, er } } + if err := n.bucketCache.Put(info); err != nil { + n.log.Warn("could not put bucket info into cache", + zap.Stringer("cid", cid), + zap.String("bucket_name", info.Name), + zap.Error(err)) + } + return info, nil } -func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) { +func (n *layer) containerList(ctx context.Context) ([]*cache.BucketInfo, error) { var ( err error own = n.Owner(ctx) @@ -101,7 +100,7 @@ func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) { return nil, err } - list := make([]*BucketInfo, 0, len(res)) + list := make([]*cache.BucketInfo, 0, len(res)) for _, cid := range res { info, err := n.containerInfo(ctx, cid) if err != nil { @@ -118,29 +117,42 @@ func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) { } func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*cid.ID, error) { + var err error + bktInfo := &cache.BucketInfo{ + Name: p.Name, + Owner: n.Owner(ctx), + Created: time.Now(), + BasicACL: p.ACL, + } cnr := container.New( container.WithPolicy(p.Policy), container.WithCustomBasicACL(p.ACL), container.WithAttribute(container.AttributeName, p.Name), - container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10))) + container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(bktInfo.Created.Unix(), 10))) cnr.SetSessionToken(p.BoxData.Gate.SessionToken) - cnr.SetOwnerID(n.Owner(ctx)) + cnr.SetOwnerID(bktInfo.Owner) - cid, err := n.pool.PutContainer(ctx, cnr) - if err != nil { - return nil, fmt.Errorf("failed to create a bucket: %w", err) - } - - if err = n.pool.WaitForContainerPresence(ctx, cid, pool.DefaultPollingParams()); err != nil { + if bktInfo.CID, err = n.pool.PutContainer(ctx, cnr); err != nil { return nil, err } - if err := n.setContainerEACLTable(ctx, cid, p.EACL); err != nil { + if err = n.pool.WaitForContainerPresence(ctx, bktInfo.CID, pool.DefaultPollingParams()); err != nil { return nil, err } - return cid, nil + if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL); err != nil { + return nil, err + } + + if err = n.bucketCache.Put(bktInfo); err != nil { + n.log.Warn("couldn't put bucket info into cache", + zap.String("bucket name", bktInfo.Name), + zap.Stringer("bucket cid", bktInfo.CID), + zap.Error(err)) + } + + return bktInfo.CID, nil } func (n *layer) setContainerEACLTable(ctx context.Context, cid *cid.ID, table *eacl.Table) error { diff --git a/api/layer/detector.go b/api/layer/detector.go index 3a9b7d852..81ec75b78 100644 --- a/api/layer/detector.go +++ b/api/layer/detector.go @@ -3,24 +3,56 @@ package layer import ( "io" "net/http" - "sync" ) -type detector struct { - io.Reader - sync.Once +type ( + detector struct { + io.Reader + err error + data []byte + } + errReader struct { + data []byte + err error + offset int + } +) - contentType string +const contentTypeDetectSize = 512 + +func newReader(data []byte, err error) *errReader { + return &errReader{data: data, err: err} } -func newDetector(r io.Reader) *detector { - return &detector{Reader: r} +func (r *errReader) Read(b []byte) (int, error) { + if r.offset >= len(r.data) { + return 0, io.EOF + } + n := copy(b, r.data[r.offset:]) + r.offset += n + if r.offset >= len(r.data) { + return n, r.err + } + return n, nil } -func (d *detector) Read(data []byte) (int, error) { - d.Do(func() { - d.contentType = http.DetectContentType(data) - }) - - return d.Reader.Read(data) +func newDetector(reader io.Reader) *detector { + return &detector{ + data: make([]byte, contentTypeDetectSize), + Reader: reader, + } +} + +func (d *detector) Detect() (string, error) { + n, err := d.Reader.Read(d.data) + if err != nil && err != io.EOF { + d.err = err + return "", err + } + d.data = d.data[:n] + return http.DetectContentType(d.data), nil +} + +func (d *detector) MultiReader() io.Reader { + return io.MultiReader(newReader(d.data, d.err), d.Reader) } diff --git a/api/layer/layer.go b/api/layer/layer.go index fd7d05816..b49691bd1 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -1,12 +1,12 @@ package layer import ( + "bytes" "context" "crypto/ecdsa" "fmt" "io" "net/url" - "sort" "time" "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" @@ -25,10 +25,13 @@ import ( type ( layer struct { - pool pool.Pool - log *zap.Logger - listObjCache ObjectsListCache - objCache cache.ObjectsCache + pool pool.Pool + log *zap.Logger + listsCache ObjectsListCache + objCache cache.ObjectsCache + namesCache cache.ObjectsNameCache + bucketCache cache.BucketCache + systemCache cache.SystemCache } // CacheConfig contains params for caches. @@ -48,12 +51,19 @@ type ( // GetObjectParams stores object get request parameters. GetObjectParams struct { - Range *RangeParams - Bucket string - Object string - Offset int64 - Length int64 - Writer io.Writer + Range *RangeParams + ObjectInfo *ObjectInfo + Offset int64 + Length int64 + Writer io.Writer + VersionID string + } + + // HeadObjectParams stores object head request parameters. + HeadObjectParams struct { + Bucket string + Object string + VersionID string } // RangeParams stores range header request parameters. @@ -71,11 +81,21 @@ type ( Header map[string]string } + // PutVersioningParams stores object copy request parameters. + PutVersioningParams struct { + Bucket string + Settings *BucketSettings + } + + // BucketSettings stores settings such as versioning. + BucketSettings struct { + VersioningEnabled bool + } + // CopyObjectParams stores object copy request parameters. CopyObjectParams struct { - SrcBucket string + SrcObject *ObjectInfo DstBucket string - SrcObject string DstObject string SrcSize int64 Header map[string]string @@ -108,6 +128,12 @@ type ( Encode string } + // VersionedObject stores object name and version. + VersionedObject struct { + Name string + VersionID string + } + // NeoFS provides basic NeoFS interface. NeoFS interface { Get(ctx context.Context, address *object.Address) (*object.Object, error) @@ -117,15 +143,18 @@ type ( Client interface { NeoFS - ListBuckets(ctx context.Context) ([]*BucketInfo, error) - GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) + PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) + GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error) + + ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error) + GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error) GetBucketACL(ctx context.Context, name string) (*BucketACL, error) PutBucketACL(ctx context.Context, p *PutBucketACLParams) error CreateBucket(ctx context.Context, p *CreateBucketParams) (*cid.ID, error) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error GetObject(ctx context.Context, p *GetObjectParams) error - GetObjectInfo(ctx context.Context, bucketName, objectName string) (*ObjectInfo, error) + GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error) PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) @@ -135,23 +164,26 @@ type ( ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) - DeleteObject(ctx context.Context, bucket, object string) error - DeleteObjects(ctx context.Context, bucket string, objects []string) []error + DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error } ) -const ( - unversionedObjectVersionID = "null" -) +func (t *VersionedObject) String() string { + return t.Name + ":" + t.VersionID +} // NewLayer creates instance of layer. It checks credentials // and establishes gRPC connection with node. func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client { return &layer{ - pool: conns, - log: log, - listObjCache: newListObjectsCache(config.ListObjectsLifetime), - objCache: cache.New(config.Size, config.Lifetime), + pool: conns, + log: log, + listsCache: newListObjectsCache(config.ListObjectsLifetime), + objCache: cache.New(config.Size, config.Lifetime), + //todo reconsider cache params + namesCache: cache.NewObjectsNameCache(1000, time.Minute), + bucketCache: cache.NewBucketCache(150, time.Minute), + systemCache: cache.NewSystemCache(1000, 5*time.Minute), } } @@ -189,12 +221,16 @@ func (n *layer) Get(ctx context.Context, address *object.Address) (*object.Objec } // GetBucketInfo returns bucket info by name. -func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) { +func (n *layer) GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error) { name, err := url.QueryUnescape(name) if err != nil { return nil, err } + if bktInfo := n.bucketCache.Get(name); bktInfo != nil { + return bktInfo, nil + } + containerID := new(cid.ID) if err := containerID.Parse(name); err != nil { list, err := n.containerList(ctx) @@ -243,33 +279,20 @@ func (n *layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) err // ListBuckets returns all user containers. Name of the bucket is a container // id. Timestamp is omitted since it is not saved in neofs container. -func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) { +func (n *layer) ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error) { return n.containerList(ctx) } // GetObject from storage. func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { - var ( - err error - oid *object.ID - bkt *BucketInfo - ) - - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { - return fmt.Errorf("couldn't find bucket: %s : %w", p.Bucket, err) - } else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil { - return fmt.Errorf("search of the object failed: cid: %s, val: %s : %w", bkt.CID, p.Object, err) - } - - addr := object.NewAddress() - addr.SetObjectID(oid) - addr.SetContainerID(bkt.CID) + var err error params := &getParams{ - Writer: p.Writer, - address: addr, - offset: p.Offset, - length: p.Length, + Writer: p.Writer, + cid: p.ObjectInfo.CID(), + oid: p.ObjectInfo.ID(), + offset: p.Offset, + length: p.Length, } if p.Range != nil { @@ -284,64 +307,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { } if err != nil { - n.objCache.Delete(addr) - return fmt.Errorf("couldn't get object, cid: %s : %w", bkt.CID, err) + n.objCache.Delete(p.ObjectInfo.Address()) + return fmt.Errorf("couldn't get object, cid: %s : %w", p.ObjectInfo.CID(), err) } return nil } -func (n *layer) checkObject(ctx context.Context, cid *cid.ID, filename string) error { - var err error - - if _, err = n.objectFindID(ctx, &findParams{cid: cid, val: filename}); err == nil { - return new(errors.ObjectAlreadyExists) - } - - return err -} - // GetObjectInfo returns meta information about the object. -func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string) (*ObjectInfo, error) { - var ( - err error - oid *object.ID - bkt *BucketInfo - meta *object.Object - ) - - if bkt, err = n.GetBucketInfo(ctx, bucketName); err != nil { +func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error) { + bkt, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { n.log.Error("could not fetch bucket info", zap.Error(err)) return nil, err - } else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil { - n.log.Error("could not find object id", zap.Error(err)) + } + + if len(p.VersionID) == 0 { + return n.headLastVersionIfNotDeleted(ctx, bkt, p.Object) + } + + return n.headVersion(ctx, bkt, p.VersionID) +} + +func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *cache.BucketInfo) (*ObjectInfo, error) { + if meta := n.systemCache.Get(bkt.SettingsObjectKey()); meta != nil { + return objInfoFromMeta(bkt, meta), nil + } + + oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: bkt.SettingsObjectName()}) + if err != nil { return nil, err } - addr := object.NewAddress() - addr.SetObjectID(oid) - addr.SetContainerID(bkt.CID) - - /* todo: now we get an address via request to NeoFS and try to find the object with the address in cache - but it will be resolved after implementation of local cache with nicenames and address of objects - for get/head requests */ - meta = n.objCache.Get(addr) - if meta == nil { - meta, err = n.objectHead(ctx, addr) - if err != nil { - n.log.Error("could not fetch object head", zap.Error(err)) - return nil, err - } - if err = n.objCache.Put(addr, *meta); err != nil { - n.log.Error("couldn't cache an object", zap.Error(err)) - } + meta, err := n.objectHead(ctx, bkt.CID, oid) + if err != nil { + n.log.Error("could not fetch object head", zap.Error(err)) + return nil, err } - return objectInfoFromMeta(bkt, meta, "", ""), nil + if err = n.systemCache.Put(bkt.SettingsObjectKey(), meta); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return objInfoFromMeta(bkt, meta), nil } // PutObject into storage. func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) { - return n.objectPut(ctx, p) + bkt, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { + return nil, err + } + + return n.objectPut(ctx, bkt, p) } // CopyObject from one bucket into another bucket. @@ -350,9 +367,8 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf go func() { err := n.GetObject(ctx, &GetObjectParams{ - Bucket: p.SrcBucket, - Object: p.SrcObject, - Writer: pw, + ObjectInfo: p.SrcObject, + Writer: pw, }) if err = pw.CloseWithError(err); err != nil { @@ -370,35 +386,47 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf } // DeleteObject removes all objects with passed nice name. -func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error { +func (n *layer) deleteObject(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) error { var ( err error ids []*object.ID - bkt *BucketInfo ) - if bkt, err = n.GetBucketInfo(ctx, bucket); err != nil { - return &errors.DeleteError{ - Err: err, - Object: filename, + versioningEnabled := n.isVersioningEnabled(ctx, bkt) + if !versioningEnabled && obj.VersionID != unversionedObjectVersionID && obj.VersionID != "" { + return errors.GetAPIError(errors.ErrInvalidVersion) + } + + if versioningEnabled { + p := &PutObjectParams{ + Object: obj.Name, + Reader: bytes.NewReader(nil), + Header: map[string]string{versionsDeleteMarkAttr: obj.VersionID}, } - } else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil { - return &errors.DeleteError{ - Err: err, - Object: filename, + if len(obj.VersionID) != 0 { + id, err := n.checkVersionsExist(ctx, bkt, obj) + if err != nil { + return err + } + ids = []*object.ID{id} + + p.Header[versionsDelAttr] = obj.VersionID + } else { + p.Header[versionsDeleteMarkAttr] = delMarkFullObject + } + if _, err = n.objectPut(ctx, bkt, p); err != nil { + return err + } + } else { + ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: obj.Name}) + if err != nil { + return err } } for _, id := range ids { - addr := object.NewAddress() - addr.SetObjectID(id) - addr.SetContainerID(bkt.CID) - - if err = n.objectDelete(ctx, addr); err != nil { - return &errors.DeleteError{ - Err: err, - Object: filename, - } + if err = n.objectDelete(ctx, bkt.CID, id); err != nil { + return err } } @@ -406,12 +434,17 @@ func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error } // DeleteObjects from the storage. -func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []string) []error { +func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error { var errs = make([]error, 0, len(objects)) - for i := range objects { - if err := n.DeleteObject(ctx, bucket, objects[i]); err != nil { - errs = append(errs, err) + bkt, err := n.GetBucketInfo(ctx, bucket) + if err != nil { + return append(errs, err) + } + + for _, obj := range objects { + if err := n.deleteObject(ctx, bkt, obj); err != nil { + errs = append(errs, &errors.ObjectError{Err: err, Object: obj.Name, Version: obj.VersionID}) } } @@ -436,75 +469,17 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { return err } - return n.deleteContainer(ctx, bucketInfo.CID) -} - -func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - var ( - res = ListObjectVersionsInfo{} - err error - bkt *BucketInfo - ids []*object.ID - uniqNames = make(map[string]bool) - ) - - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { - return nil, err - } else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID}); err != nil { - return nil, err - } - - versions := make([]*ObjectVersionInfo, 0, len(ids)) - // todo: deletemarkers is empty now, we will use it after proper realization of versioning - deleted := make([]*DeletedObjectInfo, 0, len(ids)) - res.DeleteMarker = deleted - - for _, id := range ids { - addr := object.NewAddress() - addr.SetObjectID(id) - addr.SetContainerID(bkt.CID) - - meta, err := n.objectHead(ctx, addr) - if err != nil { - n.log.Warn("could not fetch object meta", zap.Error(err)) - continue - } - if ov := objectVersionInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); ov != nil { - if _, ok := uniqNames[ov.Object.Name]; ok { - continue - } - if len(p.KeyMarker) > 0 && ov.Object.Name <= p.KeyMarker { - continue - } - uniqNames[ov.Object.Name] = ov.Object.isDir - versions = append(versions, ov) - } - } - - sort.Slice(versions, func(i, j int) bool { - return versions[i].Object.Name < versions[j].Object.Name - }) - - if len(versions) > p.MaxKeys { - res.IsTruncated = true - - lastVersion := versions[p.MaxKeys-1] - res.KeyMarker = lastVersion.Object.Name - res.VersionIDMarker = lastVersion.VersionID - - nextVersion := versions[p.MaxKeys] - res.NextKeyMarker = nextVersion.Object.Name - res.NextVersionIDMarker = nextVersion.VersionID - - versions = versions[:p.MaxKeys] - } - - for _, ov := range versions { - if isDir := uniqNames[ov.Object.Name]; isDir { - res.CommonPrefixes = append(res.CommonPrefixes, &ov.Object.Name) - } else { - res.Version = append(res.Version, ov) - } - } - return &res, nil + objects, err := n.listSortedObjectsFromNeoFS(ctx, allObjectParams{Bucket: bucketInfo}) + if err != nil { + return err + } + if len(objects) != 0 { + return errors.GetAPIError(errors.ErrBucketNotEmpty) + } + + if err = n.deleteContainer(ctx, bucketInfo.CID); err != nil { + return err + } + n.bucketCache.Delete(bucketInfo.Name) + return nil } diff --git a/api/layer/object.go b/api/layer/object.go index 77d310d10..b18c12cd5 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -7,28 +7,34 @@ import ( "net/url" "sort" "strconv" + "strings" "time" "github.com/nspcc-dev/neofs-api-go/pkg/client" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors" "go.uber.org/zap" ) type ( findParams struct { - val string - cid *cid.ID + attr string + val string + cid *cid.ID } getParams struct { io.Writer *object.Range - offset int64 - length int64 - address *object.Address + offset int64 + length int64 + cid *cid.ID + oid *object.ID } // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. @@ -55,7 +61,7 @@ type ( } allObjectParams struct { - Bucket *BucketInfo + Bucket *cache.BucketInfo Delimiter string Prefix string } @@ -70,7 +76,11 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, if filename, err := url.QueryUnescape(p.val); err != nil { return nil, err } else if filename != "" { - opts.AddFilter(object.AttributeFileName, filename, object.MatchStringEqual) + if p.attr == "" { + opts.AddFilter(object.AttributeFileName, filename, object.MatchStringEqual) + } else { + opts.AddFilter(p.attr, filename, object.MatchStringEqual) + } } return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx)) } @@ -89,9 +99,16 @@ func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, er return nil, errors.New("several objects with the same name found") } +func newAddress(cid *cid.ID, oid *object.ID) *object.Address { + address := object.NewAddress() + address.SetContainerID(cid) + address.SetObjectID(oid) + return address +} + // objectHead returns all object's headers. -func (n *layer) objectHead(ctx context.Context, address *object.Address) (*object.Object, error) { - ops := new(client.ObjectHeaderParams).WithAddress(address).WithAllFields() +func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) { + ops := new(client.ObjectHeaderParams).WithAddress(newAddress(cid, oid)).WithAllFields() return n.pool.GetObjectHeader(ctx, ops, n.BearerOpt(ctx)) } @@ -99,54 +116,96 @@ func (n *layer) objectHead(ctx context.Context, address *object.Address) (*objec func (n *layer) objectGet(ctx context.Context, p *getParams) (*object.Object, error) { // prepare length/offset writer w := newWriter(p.Writer, p.offset, p.length) - ops := new(client.GetObjectParams).WithAddress(p.address).WithPayloadWriter(w) + ops := new(client.GetObjectParams).WithAddress(newAddress(p.cid, p.oid)).WithPayloadWriter(w) return n.pool.GetObject(ctx, ops, n.BearerOpt(ctx)) } // objectRange gets object range and writes it into provided io.Writer. func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) { w := newWriter(p.Writer, p.offset, p.length) - ops := new(client.RangeDataParams).WithAddress(p.address).WithDataWriter(w).WithRange(p.Range) + ops := new(client.RangeDataParams).WithAddress(newAddress(p.cid, p.oid)).WithDataWriter(w).WithRange(p.Range) return n.pool.ObjectPayloadRangeData(ctx, ops, n.BearerOpt(ctx)) } // objectPut into NeoFS, took payload from io.Reader. -func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) { - var ( - err error - obj string - bkt *BucketInfo - own = n.Owner(ctx) - ) +func (n *layer) objectPut(ctx context.Context, bkt *cache.BucketInfo, p *PutObjectParams) (*ObjectInfo, error) { + own := n.Owner(ctx) + obj, err := url.QueryUnescape(p.Object) + if err != nil { + return nil, err + } - if obj, err = url.QueryUnescape(p.Object); err != nil { + versioningEnabled := n.isVersioningEnabled(ctx, bkt) + versions, err := n.headVersions(ctx, bkt, obj) + if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) { return nil, err - } else if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { - return nil, err - } else if err = n.checkObject(ctx, bkt.CID, p.Object); err != nil { - var errExist *apiErrors.ObjectAlreadyExists - if ok := errors.As(err, &errExist); ok { - errExist.Bucket = p.Bucket - errExist.Object = p.Object - return nil, errExist + } + idsToDeleteArr := updateCRDT2PSetHeaders(p, versions, versioningEnabled) + + r := p.Reader + if len(p.Header[api.ContentType]) == 0 { + d := newDetector(r) + if contentType, err := d.Detect(); err == nil { + p.Header[api.ContentType] = contentType } + r = d.MultiReader() + } + rawObject := formRawObject(p, bkt.CID, own, obj) - if !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) { - return nil, err + ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r) + oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) + if err != nil { + return nil, err + } + + if p.Header[versionsDeleteMarkAttr] == delMarkFullObject { + if last := versions.getLast(); last != nil { + n.objCache.Delete(last.Address()) } } - attributes := make([]*object.Attribute, 0, len(p.Header)+1) + meta, err := n.objectHead(ctx, bkt.CID, oid) + if err != nil { + return nil, err + } - unix := strconv.FormatInt(time.Now().UTC().Unix(), 10) + if err = n.objCache.Put(*meta); err != nil { + n.log.Error("couldn't cache an object", zap.Error(err)) + } + for _, id := range idsToDeleteArr { + if err = n.objectDelete(ctx, bkt.CID, id); err != nil { + n.log.Warn("couldn't delete object", + zap.Stringer("version id", id), + zap.Error(err)) + } + } + + return &ObjectInfo{ + id: oid, + bucketID: bkt.CID, + + Owner: own, + Bucket: p.Bucket, + Name: p.Object, + Size: p.Size, + Created: time.Now(), + CreationEpoch: meta.CreationEpoch(), + Headers: p.Header, + ContentType: p.Header[api.ContentType], + HashSum: meta.PayloadChecksum().String(), + }, nil +} + +func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string) *object.RawObject { + attributes := make([]*object.Attribute, 0, len(p.Header)+2) filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) filename.SetValue(obj) createdAt := object.NewAttribute() createdAt.SetKey(object.AttributeTimestamp) - createdAt.SetValue(unix) + createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) attributes = append(attributes, filename, createdAt) @@ -160,49 +219,155 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, raw := object.NewRaw() raw.SetOwnerID(own) - raw.SetContainerID(bkt.CID) + raw.SetContainerID(bktID) raw.SetAttributes(attributes...) - r := newDetector(p.Reader) + return raw +} - ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(r) - oid, err := n.pool.PutObject( - ctx, - ops, - n.BearerOpt(ctx), - ) +func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versioningEnabled bool) []*object.ID { + var idsToDeleteArr []*object.ID + if versions == nil { + return idsToDeleteArr + } + + if versioningEnabled { + if len(versions.addList) != 0 { + p.Header[versionsAddAttr] = versions.getAddHeader() + } + + deleted := versions.getDelHeader() + // p.Header[versionsDelAttr] can be not empty when deleting specific version + if delAttr := p.Header[versionsDelAttr]; len(delAttr) != 0 { + if len(deleted) != 0 { + p.Header[versionsDelAttr] = deleted + "," + delAttr + } else { + p.Header[versionsDelAttr] = delAttr + } + } else if len(deleted) != 0 { + p.Header[versionsDelAttr] = deleted + } + } else { + versionsDeletedStr := versions.getDelHeader() + if len(versionsDeletedStr) != 0 { + versionsDeletedStr += "," + } + + if lastVersion := versions.getLast(); lastVersion != nil { + p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version() + idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID()) + } else if len(versionsDeletedStr) != 0 { + p.Header[versionsDelAttr] = versionsDeletedStr + } + + for _, version := range versions.objects { + if contains(versions.delList, version.Version()) { + idsToDeleteArr = append(idsToDeleteArr, version.ID()) + } + } + } + + return idsToDeleteArr +} + +func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*ObjectInfo, error) { + if address := n.namesCache.Get(bkt.Name + "/" + objectName); address != nil { + if headInfo := n.objCache.Get(address); headInfo != nil { + return objInfoFromMeta(bkt, headInfo), nil + } + } + + versions, err := n.headVersions(ctx, bkt, objectName) if err != nil { return nil, err } - addr := object.NewAddress() - addr.SetObjectID(oid) - addr.SetContainerID(bkt.CID) - meta, err := n.objectHead(ctx, addr) + lastVersion := versions.getLast() + if lastVersion == nil { + return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) + } + + if err = n.namesCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil { + n.log.Warn("couldn't put obj address to head cache", + zap.String("obj nice name", lastVersion.NiceName()), + zap.Error(err)) + } + + return lastVersion, nil +} + +func (n *layer) headVersions(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*objectVersions, error) { + ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName}) if err != nil { return nil, err } - if err = n.objCache.Put(addr, *meta); err != nil { - n.log.Error("couldn't cache an object", zap.Error(err)) + if len(ids) == 0 { + return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) } - return &ObjectInfo{ - id: oid, + versions := newObjectVersions(objectName) + for _, id := range ids { + meta, err := n.objectHead(ctx, bkt.CID, id) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("object id", id), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + continue + } + if err = n.objCache.Put(*meta); err != nil { + n.log.Warn("couldn't put meta to objects cache", + zap.Stringer("object id", id), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + } - Owner: own, - Bucket: p.Bucket, - Name: p.Object, - Size: p.Size, - Created: time.Now(), - Headers: p.Header, - ContentType: r.contentType, - HashSum: meta.PayloadChecksum().String(), - }, nil + if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil { + if isSystem(oi) { + continue + } + versions.appendVersion(oi) + } + } + + return versions, nil +} + +func (n *layer) headVersion(ctx context.Context, bkt *cache.BucketInfo, versionID string) (*ObjectInfo, error) { + oid := object.NewID() + if err := oid.Parse(versionID); err != nil { + return nil, err + } + + if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil { + return objInfoFromMeta(bkt, headInfo), nil + } + + meta, err := n.objectHead(ctx, bkt.CID, oid) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion) + } + return nil, err + } + + objInfo := objectInfoFromMeta(bkt, meta, "", "") + if err = n.objCache.Put(*meta); err != nil { + n.log.Warn("couldn't put obj to object cache", + zap.String("bucket name", objInfo.Bucket), + zap.Stringer("bucket cid", objInfo.CID()), + zap.String("object name", objInfo.Name), + zap.Stringer("object id", objInfo.ID()), + zap.Error(err)) + } + + return objInfo, nil } // objectDelete puts tombstone object into neofs. -func (n *layer) objectDelete(ctx context.Context, address *object.Address) error { +func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error { + address := newAddress(cid, oid) dop := new(client.DeleteObjectParams) dop.WithAddress(address) n.objCache.Delete(address) @@ -284,37 +449,16 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { - var ( - err error - ids []*object.ID - uniqNames = make(map[string]bool) - ) - - if ids, err = n.objectSearch(ctx, &findParams{cid: p.Bucket.CID}); err != nil { + versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) + if err != nil { return nil, err } - objects := make([]*ObjectInfo, 0, len(ids)) - - for _, id := range ids { - addr := object.NewAddress() - addr.SetObjectID(id) - addr.SetContainerID(p.Bucket.CID) - - meta, err := n.objectHead(ctx, addr) - if err != nil { - n.log.Warn("could not fetch object meta", zap.Error(err)) - continue - } - if oi := objectInfoFromMeta(p.Bucket, meta, p.Prefix, p.Delimiter); oi != nil { - // use only unique dir names - if _, ok := uniqNames[oi.Name]; ok { - continue - } - - uniqNames[oi.Name] = oi.isDir - - objects = append(objects, oi) + objects := make([]*ObjectInfo, 0, len(versions)) + for _, v := range versions { + lastVersion := v.getLast() + if lastVersion != nil { + objects = append(objects, lastVersion) } } @@ -325,6 +469,59 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam return objects, nil } +func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *cache.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) { + ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID}) + if err != nil { + return nil, err + } + + versions := make(map[string]*objectVersions, len(ids)/2) + for _, id := range ids { + meta, err := n.objectHead(ctx, bkt.CID, id) + if err != nil { + n.log.Warn("could not fetch object meta", zap.Error(err)) + continue + } + if oi := objectInfoFromMeta(bkt, meta, prefix, delimiter); oi != nil { + if isSystem(oi) { + continue + } + + objVersions, ok := versions[oi.Name] + if !ok { + objVersions = newObjectVersions(oi.Name) + } + objVersions.appendVersion(oi) + versions[oi.Name] = objVersions + } + } + + return versions, nil +} + +func getExistedVersions(versions *objectVersions) []string { + var res []string + for _, add := range versions.addList { + if !contains(versions.delList, add) { + res = append(res, add) + } + } + return res +} + +func splitVersions(header string) []string { + if len(header) == 0 { + return nil + } + + return strings.Split(header, ",") +} + +func isSystem(obj *ObjectInfo) bool { + return len(obj.Headers[objectSystemAttributeName]) > 0 || + len(obj.Headers[attrVersionsIgnore]) > 0 +} + func trimAfterObjectName(startAfter string, objects []*ObjectInfo) []*ObjectInfo { if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter { return nil @@ -366,7 +563,7 @@ func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*Obje func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) { var ( err error - bkt *BucketInfo + bkt *cache.BucketInfo cacheKey cacheOptions allObjects []*ObjectInfo ) @@ -375,11 +572,11 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ( return nil, err } - if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil { + if cacheKey, err = createKey(ctx, bkt.CID, listObjectsMethod, p.Prefix, p.Delimiter); err != nil { return nil, err } - allObjects = n.listObjCache.Get(cacheKey) + allObjects = n.listsCache.Get(cacheKey) if allObjects == nil { allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{ @@ -392,8 +589,18 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ( } // putting to cache a copy of allObjects because allObjects can be modified further - n.listObjCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) + n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) } return allObjects, nil } + +func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *cache.BucketInfo) bool { + settings, err := n.getBucketSettings(ctx, bktInfo) + if err != nil { + n.log.Warn("couldn't get versioning settings object", zap.Error(err)) + return false + } + + return settings.VersioningEnabled +} diff --git a/api/layer/object_list_cache.go b/api/layer/object_list_cache.go index 6ed174a9a..18e562ce4 100644 --- a/api/layer/object_list_cache.go +++ b/api/layer/object_list_cache.go @@ -30,6 +30,11 @@ type ( // DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects. const DefaultObjectsListCacheLifetime = time.Second * 60 +const ( + listObjectsMethod = "listObjects" + listVersionsMethod = "listVersions" +) + type ( listObjectsCache struct { cacheLifetime time.Duration @@ -40,6 +45,7 @@ type ( list []*ObjectInfo } cacheOptions struct { + method string key string delimiter string prefix string @@ -78,12 +84,13 @@ func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) { }) } -func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) { +func createKey(ctx context.Context, cid *cid.ID, method, prefix, delimiter string) (cacheOptions, error) { box, err := GetBoxData(ctx) if err != nil { return cacheOptions{}, err } p := cacheOptions{ + method: method, key: box.Gate.AccessKey + cid.String(), delimiter: delimiter, prefix: prefix, diff --git a/api/layer/util.go b/api/layer/util.go index 5bcee8b3a..557141f7a 100644 --- a/api/layer/util.go +++ b/api/layer/util.go @@ -8,26 +8,30 @@ import ( "strings" "time" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" ) type ( // ObjectInfo holds S3 object data. ObjectInfo struct { - id *object.ID - isDir bool + id *object.ID + bucketID *cid.ID + isDir bool - Bucket string - Name string - Size int64 - ContentType string - Created time.Time - HashSum string - Owner *owner.ID - Headers map[string]string + Bucket string + Name string + Size int64 + ContentType string + Created time.Time + CreationEpoch uint64 + HashSum string + Owner *owner.ID + Headers map[string]string } // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. @@ -51,29 +55,19 @@ type ( // ObjectVersionInfo stores info about objects versions. ObjectVersionInfo struct { - Object *ObjectInfo - IsLatest bool - VersionID string - } - - // DeletedObjectInfo stores info about deleted versions of objects. - DeletedObjectInfo struct { - Owner *owner.ID - Key string - VersionID string - IsLatest bool - LastModified string + Object *ObjectInfo + IsLatest bool } // ListObjectVersionsInfo stores info and list of objects' versions. ListObjectVersionsInfo struct { - CommonPrefixes []*string + CommonPrefixes []string IsTruncated bool KeyMarker string NextKeyMarker string NextVersionIDMarker string Version []*ObjectVersionInfo - DeleteMarker []*DeletedObjectInfo + DeleteMarker []*ObjectVersionInfo VersionIDMarker string } ) @@ -91,7 +85,11 @@ func userHeaders(attrs []*object.Attribute) map[string]string { return result } -func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo { +func objInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object) *ObjectInfo { + return objectInfoFromMeta(bkt, meta, "", "") +} + +func objectInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo { var ( isDir bool size int64 @@ -133,28 +131,22 @@ func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter } return &ObjectInfo{ - id: meta.ID(), - isDir: isDir, + id: meta.ID(), + bucketID: bkt.CID, + isDir: isDir, - Bucket: bkt.Name, - Name: filename, - Created: creation, - ContentType: mimeType, - Headers: userHeaders, - Owner: meta.OwnerID(), - Size: size, - HashSum: meta.PayloadChecksum().String(), + Bucket: bkt.Name, + Name: filename, + Created: creation, + CreationEpoch: meta.CreationEpoch(), + ContentType: mimeType, + Headers: userHeaders, + Owner: meta.OwnerID(), + Size: size, + HashSum: meta.PayloadChecksum().String(), } } -func objectVersionInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectVersionInfo { - oi := objectInfoFromMeta(bkt, meta, prefix, delimiter) - if oi == nil { - return nil - } - return &ObjectVersionInfo{Object: oi, IsLatest: true, VersionID: unversionedObjectVersionID} -} - func filenameFromObject(o *object.Object) string { var name = o.ID().String() for _, attr := range o.Attributes() { @@ -174,6 +166,18 @@ func NameFromString(name string) (string, string) { // ID returns object ID from ObjectInfo. func (o *ObjectInfo) ID() *object.ID { return o.id } +// Version returns object version from ObjectInfo. +func (o *ObjectInfo) Version() string { return o.id.String() } + +// NiceName returns object name for cache. +func (o *ObjectInfo) NiceName() string { return o.Bucket + "/" + o.Name } + +// Address returns object address. +func (o *ObjectInfo) Address() *object.Address { return newAddress(o.bucketID, o.id) } + +// CID returns bucket ID from ObjectInfo. +func (o *ObjectInfo) CID() *cid.ID { return o.bucketID } + // IsDir allows to check if object is a directory. func (o *ObjectInfo) IsDir() bool { return o.isDir } diff --git a/api/layer/util_test.go b/api/layer/util_test.go index 9b44d8544..950b783c1 100644 --- a/api/layer/util_test.go +++ b/api/layer/util_test.go @@ -9,6 +9,7 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/stretchr/testify/require" ) @@ -19,7 +20,7 @@ var ( defaultTestContentType = http.DetectContentType(defaultTestPayload) ) -func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object { +func newTestObject(oid *object.ID, bkt *cache.BucketInfo, name string) *object.Object { filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) filename.SetValue(name) @@ -43,11 +44,12 @@ func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object return raw.Object() } -func newTestInfo(oid *object.ID, bkt *BucketInfo, name string, isDir bool) *ObjectInfo { +func newTestInfo(oid *object.ID, bkt *cache.BucketInfo, name string, isDir bool) *ObjectInfo { info := &ObjectInfo{ id: oid, Name: name, Bucket: bkt.Name, + bucketID: bkt.CID, Size: defaultTestPayloadLength, ContentType: defaultTestContentType, Created: time.Unix(defaultTestCreated.Unix(), 0), @@ -70,7 +72,7 @@ func Test_objectInfoFromMeta(t *testing.T) { oid := object.NewID() containerID := cid.New() - bkt := &BucketInfo{ + bkt := &cache.BucketInfo{ Name: "test-container", CID: containerID, Owner: uid, diff --git a/api/layer/versioning.go b/api/layer/versioning.go new file mode 100644 index 000000000..20e5219a0 --- /dev/null +++ b/api/layer/versioning.go @@ -0,0 +1,325 @@ +package layer + +import ( + "context" + "sort" + "strconv" + "strings" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" + "github.com/nspcc-dev/neofs-s3-gw/api/errors" + "go.uber.org/zap" +) + +type objectVersions struct { + name string + objects []*ObjectInfo + addList []string + delList []string + isSorted bool +} + +const ( + unversionedObjectVersionID = "null" + objectSystemAttributeName = "S3-System-name" + attrVersionsIgnore = "S3-Versions-ignore" + attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled" + versionsDelAttr = "S3-Versions-del" + versionsAddAttr = "S3-Versions-add" + versionsDeleteMarkAttr = "S3-Versions-delete-mark" + delMarkFullObject = "*" +) + +func newObjectVersions(name string) *objectVersions { + return &objectVersions{name: name} +} + +func (v *objectVersions) appendVersion(oi *ObjectInfo) { + addVers := append(splitVersions(oi.Headers[versionsAddAttr]), oi.Version()) + delVers := splitVersions(oi.Headers[versionsDelAttr]) + v.objects = append(v.objects, oi) + for _, add := range addVers { + if !contains(v.addList, add) { + v.addList = append(v.addList, add) + } + } + for _, del := range delVers { + if !contains(v.delList, del) { + v.delList = append(v.delList, del) + } + } + v.isSorted = false +} + +func (v *objectVersions) sort() { + if !v.isSorted { + sort.Slice(v.objects, func(i, j int) bool { + return less(v.objects[i], v.objects[j]) + }) + v.isSorted = true + } +} + +func (v *objectVersions) getLast() *ObjectInfo { + if len(v.objects) == 0 { + return nil + } + + v.sort() + existedVersions := getExistedVersions(v) + for i := len(v.objects) - 1; i >= 0; i-- { + if contains(existedVersions, v.objects[i].Version()) { + delMarkHeader := v.objects[i].Headers[versionsDeleteMarkAttr] + if delMarkHeader == "" { + return v.objects[i] + } + if delMarkHeader == delMarkFullObject { + return nil + } + } + } + + return nil +} + +func (v *objectVersions) getFiltered() []*ObjectInfo { + if len(v.objects) == 0 { + return nil + } + + v.sort() + existedVersions := getExistedVersions(v) + res := make([]*ObjectInfo, 0, len(v.objects)) + + for _, version := range v.objects { + delMark := version.Headers[versionsDeleteMarkAttr] + if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") { + res = append(res, version) + } + } + + return res +} + +func (v *objectVersions) getAddHeader() string { + return strings.Join(v.addList, ",") +} + +func (v *objectVersions) getDelHeader() string { + return strings.Join(v.delList, ",") +} + +func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) { + bucketInfo, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { + return nil, err + } + + objectInfo, err := n.getSettingsObjectInfo(ctx, bucketInfo) + if err != nil { + n.log.Warn("couldn't get bucket version settings object, new one will be created", + zap.String("bucket_name", bucketInfo.Name), + zap.Stringer("cid", bucketInfo.CID), + zap.Error(err)) + } + + attributes := make([]*object.Attribute, 0, 3) + + filename := object.NewAttribute() + filename.SetKey(objectSystemAttributeName) + filename.SetValue(bucketInfo.SettingsObjectName()) + + createdAt := object.NewAttribute() + createdAt.SetKey(object.AttributeTimestamp) + createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) + + versioningIgnore := object.NewAttribute() + versioningIgnore.SetKey(attrVersionsIgnore) + versioningIgnore.SetValue(strconv.FormatBool(true)) + + settingsVersioningEnabled := object.NewAttribute() + settingsVersioningEnabled.SetKey(attrSettingsVersioningEnabled) + settingsVersioningEnabled.SetValue(strconv.FormatBool(p.Settings.VersioningEnabled)) + + attributes = append(attributes, filename, createdAt, versioningIgnore, settingsVersioningEnabled) + + raw := object.NewRaw() + raw.SetOwnerID(bucketInfo.Owner) + raw.SetContainerID(bucketInfo.CID) + raw.SetAttributes(attributes...) + + ops := new(client.PutObjectParams).WithObject(raw.Object()) + oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) + if err != nil { + return nil, err + } + + meta, err := n.objectHead(ctx, bucketInfo.CID, oid) + if err != nil { + return nil, err + } + + if err = n.systemCache.Put(bucketInfo.SettingsObjectKey(), meta); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + if objectInfo != nil { + if err = n.objectDelete(ctx, bucketInfo.CID, objectInfo.ID()); err != nil { + return nil, err + } + } + + return objectInfoFromMeta(bucketInfo, meta, "", ""), nil +} + +func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) { + bktInfo, err := n.GetBucketInfo(ctx, bucketName) + if err != nil { + return nil, err + } + + return n.getBucketSettings(ctx, bktInfo) +} + +func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { + var versions map[string]*objectVersions + res := &ListObjectVersionsInfo{} + + bkt, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { + return nil, err + } + + cacheKey, err := createKey(ctx, bkt.CID, listVersionsMethod, p.Prefix, p.Delimiter) + if err != nil { + return nil, err + } + + allObjects := n.listsCache.Get(cacheKey) + if allObjects == nil { + versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter) + if err != nil { + return nil, err + } + + sortedNames := make([]string, 0, len(versions)) + for k := range versions { + sortedNames = append(sortedNames, k) + } + sort.Strings(sortedNames) + + allObjects = make([]*ObjectInfo, 0, p.MaxKeys) + for _, name := range sortedNames { + allObjects = append(allObjects, versions[name].getFiltered()...) + } + + // putting to cache a copy of allObjects because allObjects can be modified further + n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) + } + + for i, obj := range allObjects { + if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker { + allObjects = allObjects[i:] + break + } + } + + res.CommonPrefixes, allObjects = triageObjects(allObjects) + + if len(allObjects) > p.MaxKeys { + res.IsTruncated = true + res.NextKeyMarker = allObjects[p.MaxKeys].Name + res.NextVersionIDMarker = allObjects[p.MaxKeys].Version() + + allObjects = allObjects[:p.MaxKeys] + res.KeyMarker = allObjects[p.MaxKeys-1].Name + res.VersionIDMarker = allObjects[p.MaxKeys-1].Version() + } + + objects := make([]*ObjectVersionInfo, len(allObjects)) + for i, obj := range allObjects { + objects[i] = &ObjectVersionInfo{Object: obj} + if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name { + objects[i].IsLatest = true + } + } + + res.Version, res.DeleteMarker = triageVersions(objects) + return res, nil +} + +func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*ObjectVersionInfo) { + if len(objVersions) == 0 { + return nil, nil + } + + var resVersion []*ObjectVersionInfo + var resDelMarkVersions []*ObjectVersionInfo + + for _, version := range objVersions { + if version.Object.Headers[versionsDeleteMarkAttr] == delMarkFullObject { + resDelMarkVersions = append(resDelMarkVersions, version) + } else { + resVersion = append(resVersion, version) + } + } + + return resVersion, resDelMarkVersions +} + +func less(ov1, ov2 *ObjectInfo) bool { + if ov1.CreationEpoch == ov2.CreationEpoch { + return ov1.Version() < ov2.Version() + } + return ov1.CreationEpoch < ov2.CreationEpoch +} + +func contains(list []string, elem string) bool { + for _, item := range list { + if elem == item { + return true + } + } + return false +} + +func (n *layer) getBucketSettings(ctx context.Context, bktInfo *cache.BucketInfo) (*BucketSettings, error) { + objInfo, err := n.getSettingsObjectInfo(ctx, bktInfo) + if err != nil { + return nil, err + } + + return objectInfoToBucketSettings(objInfo), nil +} + +func objectInfoToBucketSettings(info *ObjectInfo) *BucketSettings { + res := &BucketSettings{} + + enabled, ok := info.Headers[attrSettingsVersioningEnabled] + if ok { + if parsed, err := strconv.ParseBool(enabled); err == nil { + res.VersioningEnabled = parsed + } + } + return res +} + +func (n *layer) checkVersionsExist(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) (*object.ID, error) { + id := object.NewID() + if err := id.Parse(obj.VersionID); err != nil { + return nil, errors.GetAPIError(errors.ErrInvalidVersion) + } + + versions, err := n.headVersions(ctx, bkt, obj.Name) + if err != nil { + return nil, err + } + if !contains(getExistedVersions(versions), obj.VersionID) { + return nil, errors.GetAPIError(errors.ErrInvalidVersion) + } + + return id, nil +} diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go new file mode 100644 index 000000000..e607ef5e6 --- /dev/null +++ b/api/layer/versioning_test.go @@ -0,0 +1,610 @@ +package layer + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "fmt" + "io" + "strings" + "testing" + + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-api-go/pkg/session" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" + "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" + "github.com/nspcc-dev/neofs-sdk-go/pkg/logger" + "github.com/nspcc-dev/neofs-sdk-go/pkg/pool" + "github.com/stretchr/testify/require" +) + +type testPool struct { + objects map[string]*object.Object + containers map[string]*container.Container + currentEpoch uint64 +} + +func newTestPool() *testPool { + return &testPool{ + objects: make(map[string]*object.Object), + containers: make(map[string]*container.Container), + } +} + +func (t *testPool) PutObject(ctx context.Context, params *client.PutObjectParams, option ...client.CallOption) (*object.ID, error) { + b := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return nil, err + } + + oid := object.NewID() + oid.SetSHA256(sha256.Sum256(b)) + + raw := object.NewRawFrom(params.Object()) + raw.SetID(oid) + raw.SetCreationEpoch(t.currentEpoch) + t.currentEpoch++ + + if params.PayloadReader() != nil { + all, err := io.ReadAll(params.PayloadReader()) + if err != nil { + return nil, err + } + raw.SetPayload(all) + } + + addr := newAddress(raw.ContainerID(), raw.ID()) + t.objects[addr.String()] = raw.Object() + return raw.ID(), nil +} + +func (t *testPool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, option ...client.CallOption) error { + delete(t.objects, params.Address().String()) + return nil +} + +func (t *testPool) GetObject(ctx context.Context, params *client.GetObjectParams, option ...client.CallOption) (*object.Object, error) { + if obj, ok := t.objects[params.Address().String()]; ok { + if params.PayloadWriter() != nil { + _, err := params.PayloadWriter().Write(obj.Payload()) + if err != nil { + return nil, err + } + } + return obj, nil + } + + return nil, fmt.Errorf("object not found " + params.Address().String()) +} + +func (t *testPool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, option ...client.CallOption) (*object.Object, error) { + p := new(client.GetObjectParams).WithAddress(params.Address()) + return t.GetObject(ctx, p) +} + +func (t *testPool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, option ...client.CallOption) ([]byte, error) { + panic("implement me") +} + +func (t *testPool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, option ...client.CallOption) ([][32]byte, error) { + panic("implement me") +} + +func (t *testPool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, option ...client.CallOption) ([][64]byte, error) { + panic("implement me") +} + +func (t *testPool) SearchObject(ctx context.Context, params *client.SearchObjectParams, option ...client.CallOption) ([]*object.ID, error) { + cidStr := params.ContainerID().String() + + var res []*object.ID + + if len(params.SearchFilters()) == 1 { + for k, v := range t.objects { + if strings.Contains(k, cidStr) { + res = append(res, v.ID()) + } + } + return res, nil + } + + filter := params.SearchFilters()[1] + if len(params.SearchFilters()) != 2 || filter.Operation() != object.MatchStringEqual || + (filter.Header() != object.AttributeFileName && filter.Header() != objectSystemAttributeName) { + return nil, fmt.Errorf("usupported filters") + } + + for k, v := range t.objects { + if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) { + res = append(res, v.ID()) + } + } + + return res, nil +} + +func isMatched(attributes []*object.Attribute, filter object.SearchFilter) bool { + for _, attr := range attributes { + if attr.Key() == filter.Header() && attr.Value() == filter.Value() { + return true + } + } + + return false +} + +func (t *testPool) PutContainer(ctx context.Context, container *container.Container, option ...client.CallOption) (*cid.ID, error) { + b := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return nil, err + } + + id := cid.New() + id.SetSHA256(sha256.Sum256(b)) + t.containers[id.String()] = container + + return id, nil +} + +func (t *testPool) GetContainer(ctx context.Context, id *cid.ID, option ...client.CallOption) (*container.Container, error) { + for k, v := range t.containers { + if k == id.String() { + return v, nil + } + } + + return nil, fmt.Errorf("container not found " + id.String()) +} + +func (t *testPool) ListContainers(ctx context.Context, id *owner.ID, option ...client.CallOption) ([]*cid.ID, error) { + var res []*cid.ID + for k := range t.containers { + cID := cid.New() + if err := cID.Parse(k); err != nil { + return nil, err + } + res = append(res, cID) + } + + return res, nil +} + +func (t *testPool) DeleteContainer(ctx context.Context, id *cid.ID, option ...client.CallOption) error { + delete(t.containers, id.String()) + return nil +} + +func (t *testPool) GetEACL(ctx context.Context, id *cid.ID, option ...client.CallOption) (*client.EACLWithSignature, error) { + panic("implement me") +} + +func (t *testPool) SetEACL(ctx context.Context, table *eacl.Table, option ...client.CallOption) error { + panic("implement me") +} + +func (t *testPool) AnnounceContainerUsedSpace(ctx context.Context, announcements []container.UsedSpaceAnnouncement, option ...client.CallOption) error { + panic("implement me") +} + +func (t *testPool) Connection() (client.Client, *session.Token, error) { + panic("implement me") +} + +func (t *testPool) OwnerID() *owner.ID { + return nil +} + +func (t *testPool) WaitForContainerPresence(ctx context.Context, id *cid.ID, params *pool.ContainerPollingParams) error { + return nil +} + +func (tc *testContext) putObject(content []byte) *ObjectInfo { + objInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{ + Bucket: tc.bkt, + Object: tc.obj, + Size: int64(len(content)), + Reader: bytes.NewReader(content), + Header: make(map[string]string), + }) + require.NoError(tc.t, err) + + return objInfo +} + +func (tc *testContext) getObject(objectName, versionID string, needError bool) (*ObjectInfo, []byte) { + objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{ + Bucket: tc.bkt, + Object: objectName, + VersionID: versionID, + }) + if needError { + require.Error(tc.t, err) + return nil, nil + } + require.NoError(tc.t, err) + + content := bytes.NewBuffer(nil) + err = tc.layer.GetObject(tc.ctx, &GetObjectParams{ + ObjectInfo: objInfo, + Writer: content, + VersionID: versionID, + }) + require.NoError(tc.t, err) + + return objInfo, content.Bytes() +} + +func (tc *testContext) deleteObject(objectName, versionID string) { + errs := tc.layer.DeleteObjects(tc.ctx, tc.bkt, []*VersionedObject{ + {Name: objectName, VersionID: versionID}, + }) + for _, err := range errs { + require.NoError(tc.t, err) + } +} + +func (tc *testContext) listObjectsV1() []*ObjectInfo { + res, err := tc.layer.ListObjectsV1(tc.ctx, &ListObjectsParamsV1{ + ListObjectsParamsCommon: ListObjectsParamsCommon{ + Bucket: tc.bkt, + MaxKeys: 1000, + }, + }) + require.NoError(tc.t, err) + return res.Objects +} + +func (tc *testContext) listObjectsV2() []*ObjectInfo { + res, err := tc.layer.ListObjectsV2(tc.ctx, &ListObjectsParamsV2{ + ListObjectsParamsCommon: ListObjectsParamsCommon{ + Bucket: tc.bkt, + MaxKeys: 1000, + }, + }) + require.NoError(tc.t, err) + return res.Objects +} + +func (tc *testContext) listVersions() *ListObjectVersionsInfo { + res, err := tc.layer.ListObjectVersions(tc.ctx, &ListObjectVersionsParams{ + Bucket: tc.bkt, + MaxKeys: 1000, + }) + require.NoError(tc.t, err) + return res +} + +func (tc *testContext) checkListObjects(ids ...*object.ID) { + objs := tc.listObjectsV1() + require.Equal(tc.t, len(ids), len(objs)) + for _, id := range ids { + require.Contains(tc.t, ids, id) + } + + objs = tc.listObjectsV2() + require.Equal(tc.t, len(ids), len(objs)) + for _, id := range ids { + require.Contains(tc.t, ids, id) + } +} + +type testContext struct { + t *testing.T + ctx context.Context + layer Client + bkt string + bktID *cid.ID + obj string + testPool *testPool +} + +func prepareContext(t *testing.T) *testContext { + key, err := keys.NewPrivateKey() + require.NoError(t, err) + + ctx := context.WithValue(context.Background(), api.BoxData, &accessbox.Box{ + Gate: &accessbox.GateData{ + BearerToken: token.NewBearerToken(), + GateKey: key.PublicKey(), + }, + }) + l, err := logger.New(logger.WithTraceLevel("panic")) + require.NoError(t, err) + tp := newTestPool() + + bktName := "testbucket1" + cnr := container.New(container.WithAttribute(container.AttributeName, bktName)) + bktID, err := tp.PutContainer(ctx, cnr) + require.NoError(t, err) + + return &testContext{ + ctx: ctx, + layer: NewLayer(l, tp, &CacheConfig{ + Size: cache.DefaultObjectsCacheSize, + Lifetime: cache.DefaultObjectsCacheLifetime, + ListObjectsLifetime: DefaultObjectsListCacheLifetime}, + ), + bkt: bktName, + bktID: bktID, + obj: "obj1", + t: t, + testPool: tp, + } +} + +func TestSimpleVersioning(t *testing.T) { + tc := prepareContext(t) + _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ + Bucket: tc.bkt, + Settings: &BucketSettings{VersioningEnabled: true}, + }) + require.NoError(t, err) + + obj1Content1 := []byte("content obj1 v1") + obj1v1 := tc.putObject(obj1Content1) + + obj1Content2 := []byte("content obj1 v2") + obj1v2 := tc.putObject(obj1Content2) + + objv2, buffer2 := tc.getObject(tc.obj, "", false) + require.Equal(t, obj1Content2, buffer2) + require.Contains(t, objv2.Headers[versionsAddAttr], obj1v1.ID().String()) + + _, buffer1 := tc.getObject(tc.obj, obj1v1.ID().String(), false) + require.Equal(t, obj1Content1, buffer1) + + tc.checkListObjects(obj1v2.ID()) +} + +func TestSimpleNoVersioning(t *testing.T) { + tc := prepareContext(t) + + obj1Content1 := []byte("content obj1 v1") + obj1v1 := tc.putObject(obj1Content1) + + obj1Content2 := []byte("content obj1 v2") + obj1v2 := tc.putObject(obj1Content2) + + objv2, buffer2 := tc.getObject(tc.obj, "", false) + require.Equal(t, obj1Content2, buffer2) + require.Contains(t, objv2.Headers[versionsDelAttr], obj1v1.ID().String()) + + tc.getObject(tc.obj, obj1v1.ID().String(), true) + tc.checkListObjects(obj1v2.ID()) +} + +func TestVersioningDeleteObject(t *testing.T) { + tc := prepareContext(t) + _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ + Bucket: tc.bkt, + Settings: &BucketSettings{VersioningEnabled: true}, + }) + require.NoError(t, err) + + tc.putObject([]byte("content obj1 v1")) + tc.putObject([]byte("content obj1 v2")) + + tc.deleteObject(tc.obj, "") + tc.getObject(tc.obj, "", true) + + tc.checkListObjects() +} + +func TestVersioningDeleteSpecificObjectVersion(t *testing.T) { + tc := prepareContext(t) + _, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ + Bucket: tc.bkt, + Settings: &BucketSettings{VersioningEnabled: true}, + }) + require.NoError(t, err) + + tc.putObject([]byte("content obj1 v1")) + objV2Info := tc.putObject([]byte("content obj1 v2")) + objV3Content := []byte("content obj1 v3") + objV3Info := tc.putObject(objV3Content) + + tc.deleteObject(tc.obj, objV2Info.Version()) + tc.getObject(tc.obj, objV2Info.Version(), true) + + _, buffer3 := tc.getObject(tc.obj, "", false) + require.Equal(t, objV3Content, buffer3) + + tc.deleteObject(tc.obj, "") + tc.getObject(tc.obj, "", true) + + for _, ver := range tc.listVersions().DeleteMarker { + if ver.IsLatest { + tc.deleteObject(tc.obj, ver.Object.Version()) + } + } + + resInfo, buffer := tc.getObject(tc.obj, "", false) + require.Equal(t, objV3Content, buffer) + require.Equal(t, objV3Info.Version(), resInfo.Version()) +} + +func TestNoVersioningDeleteObject(t *testing.T) { + tc := prepareContext(t) + + tc.putObject([]byte("content obj1 v1")) + tc.putObject([]byte("content obj1 v2")) + + tc.deleteObject(tc.obj, "") + tc.getObject(tc.obj, "", true) + tc.checkListObjects() +} + +func TestGetLastVersion(t *testing.T) { + obj1 := getTestObjectInfo(1, getOID(1), "", "", "") + obj1V2 := getTestObjectInfo(1, getOID(2), "", "", "") + obj2 := getTestObjectInfo(2, getOID(2), obj1.Version(), "", "") + obj3 := getTestObjectInfo(3, getOID(3), joinVers(obj1, obj2), "", "*") + obj4 := getTestObjectInfo(4, getOID(4), joinVers(obj1, obj2), obj2.Version(), obj2.Version()) + obj5 := getTestObjectInfo(5, getOID(5), obj1.Version(), obj1.Version(), obj1.Version()) + obj6 := getTestObjectInfo(6, getOID(6), joinVers(obj1, obj2, obj3), obj3.Version(), obj3.Version()) + + for _, tc := range []struct { + versions *objectVersions + expected *ObjectInfo + }{ + { + versions: &objectVersions{}, + expected: nil, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj2, obj1}, + addList: []string{obj1.Version(), obj2.Version()}, + }, + expected: obj2, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj2, obj1, obj3}, + addList: []string{obj1.Version(), obj2.Version(), obj3.Version()}, + }, + expected: nil, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj2, obj1, obj4}, + addList: []string{obj1.Version(), obj2.Version(), obj4.Version()}, + delList: []string{obj2.Version()}, + }, + expected: obj1, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj1, obj5}, + addList: []string{obj1.Version(), obj5.Version()}, + delList: []string{obj1.Version()}, + }, + expected: nil, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj5}, + }, + expected: nil, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj1, obj2, obj3, obj6}, + addList: []string{obj1.Version(), obj2.Version(), obj3.Version(), obj6.Version()}, + delList: []string{obj3.Version()}, + }, + expected: obj2, + }, + { + versions: &objectVersions{ + objects: []*ObjectInfo{obj1, obj1V2}, + addList: []string{obj1.Version(), obj1V2.Version()}, + }, + // creation epochs are equal + // obj1 version/oid > obj1_1 version/oid + expected: obj1, + }, + } { + actualObjInfo := tc.versions.getLast() + require.Equal(t, tc.expected, actualObjInfo) + } +} + +func TestAppendVersions(t *testing.T) { + obj1 := getTestObjectInfo(1, getOID(1), "", "", "") + obj2 := getTestObjectInfo(2, getOID(2), obj1.Version(), "", "") + obj3 := getTestObjectInfo(3, getOID(3), joinVers(obj1, obj2), "", "*") + obj4 := getTestObjectInfo(4, getOID(4), joinVers(obj1, obj2), obj2.Version(), obj2.Version()) + + for _, tc := range []struct { + versions *objectVersions + objectToAdd *ObjectInfo + expectedVersions *objectVersions + }{ + { + versions: &objectVersions{}, + objectToAdd: obj1, + expectedVersions: &objectVersions{ + objects: []*ObjectInfo{obj1}, + addList: []string{obj1.Version()}, + }, + }, + { + versions: &objectVersions{objects: []*ObjectInfo{obj1}}, + objectToAdd: obj2, + expectedVersions: &objectVersions{ + objects: []*ObjectInfo{obj1, obj2}, + addList: []string{obj1.Version(), obj2.Version()}, + }, + }, + { + versions: &objectVersions{objects: []*ObjectInfo{obj1, obj2}}, + objectToAdd: obj3, + expectedVersions: &objectVersions{ + objects: []*ObjectInfo{obj1, obj2, obj3}, + addList: []string{obj1.Version(), obj2.Version(), obj3.Version()}, + }, + }, + { + versions: &objectVersions{objects: []*ObjectInfo{obj1, obj2}}, + objectToAdd: obj4, + expectedVersions: &objectVersions{ + objects: []*ObjectInfo{obj1, obj2, obj4}, + addList: []string{obj1.Version(), obj2.Version(), obj4.Version()}, + delList: []string{obj2.Version()}, + }, + }, + } { + tc.versions.appendVersion(tc.objectToAdd) + require.Equal(t, tc.expectedVersions, tc.versions) + } +} + +func joinVers(objs ...*ObjectInfo) string { + if len(objs) == 0 { + return "" + } + + var versions []string + for _, obj := range objs { + versions = append(versions, obj.Version()) + } + + return strings.Join(versions, ",") +} + +func getOID(id byte) *object.ID { + b := make([]byte, 32) + b[0] = id + oid := object.NewID() + oid.SetSHA256(sha256.Sum256(b)) + return oid +} + +func getTestObjectInfo(epoch uint64, oid *object.ID, addAttr, delAttr, delMarkAttr string) *ObjectInfo { + headers := make(map[string]string) + if addAttr != "" { + headers[versionsAddAttr] = addAttr + } + if delAttr != "" { + headers[versionsDelAttr] = delAttr + } + if delMarkAttr != "" { + headers[versionsDeleteMarkAttr] = delMarkAttr + } + + return &ObjectInfo{ + id: oid, + CreationEpoch: epoch, + Headers: headers, + } +} diff --git a/docs/aws_s3_compat.md b/docs/aws_s3_compat.md index bb1eb5337..647f8ce42 100644 --- a/docs/aws_s3_compat.md +++ b/docs/aws_s3_compat.md @@ -226,8 +226,8 @@ See also `GetObject` and other method parameters. | | Method | Comments | |----|---------------------|----------| -| 🔴 | GetBucketVersioning | | -| 🔴 | PutBucketVersioning | | +| 🟢 | GetBucketVersioning | | +| 🟢 | PutBucketVersioning | | ## Website