Merge pull request #207 from KirillovDenis/feature/122-poc_versioning

[#122] Add enabling versioning
This commit is contained in:
Kirillov Denis 2021-08-25 15:01:33 +03:00 committed by GitHub
commit 04b7958ab3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 1992 additions and 495 deletions

77
api/cache/buckets.go vendored Normal file
View file

@ -0,0 +1,77 @@
package cache
import (
"time"
"github.com/bluele/gcache"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
)
type (
// BucketCache provides interface for lru cache for objects.
BucketCache interface {
Get(key string) *BucketInfo
Put(bkt *BucketInfo) error
Delete(key string) bool
}
// BucketInfo stores basic bucket data.
BucketInfo struct {
Name string
CID *cid.ID
Owner *owner.ID
Created time.Time
BasicACL uint32
}
// GetBucketCache contains cache with objects and lifetime of cache entries.
GetBucketCache struct {
cache gcache.Cache
lifetime time.Duration
}
)
// NewBucketCache creates an object of BucketCache.
func NewBucketCache(cacheSize int, lifetime time.Duration) *GetBucketCache {
gc := gcache.New(cacheSize).LRU().Build()
return &GetBucketCache{cache: gc, lifetime: lifetime}
}
// Get returns cached object.
func (o *GetBucketCache) Get(key string) *BucketInfo {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*BucketInfo)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *GetBucketCache) Put(bkt *BucketInfo) error {
return o.cache.SetWithExpire(bkt.Name, bkt, o.lifetime)
}
// Delete deletes an object from cache.
func (o *GetBucketCache) Delete(key string) bool {
return o.cache.Remove(key)
}
const bktVersionSettingsObject = ".s3-versioning-settings"
// SettingsObjectName is system name for bucket settings file.
func (b *BucketInfo) SettingsObjectName() string {
return bktVersionSettingsObject
}
// SettingsObjectKey is key to use in SystemCache.
func (b *BucketInfo) SettingsObjectKey() string {
return b.Name + bktVersionSettingsObject
}

57
api/cache/names.go vendored Normal file
View file

@ -0,0 +1,57 @@
package cache
import (
"time"
"github.com/bluele/gcache"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
// ObjectsNameCache provides interface for lru cache for objects.
// This cache contains mapping nice name to object addresses.
// Key is bucketName+objectName.
type ObjectsNameCache interface {
Get(key string) *object.Address
Put(key string, address *object.Address) error
Delete(key string) bool
}
type (
// NameCache contains cache with objects and lifetime of cache entries.
NameCache struct {
cache gcache.Cache
lifetime time.Duration
}
)
// NewObjectsNameCache creates an object of ObjectsNameCache.
func NewObjectsNameCache(cacheSize int, lifetime time.Duration) *NameCache {
gc := gcache.New(cacheSize).LRU().Build()
return &NameCache{cache: gc, lifetime: lifetime}
}
// Get returns cached object.
func (o *NameCache) Get(key string) *object.Address {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*object.Address)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *NameCache) Put(key string, address *object.Address) error {
return o.cache.SetWithExpire(key, address, o.lifetime)
}
// Delete deletes an object from cache.
func (o *NameCache) Delete(key string) bool {
return o.cache.Remove(key)
}

View file

@ -10,7 +10,7 @@ import (
// ObjectsCache provides interface for lru cache for objects.
type ObjectsCache interface {
Get(address *object.Address) *object.Object
Put(address *object.Address, obj object.Object) error
Put(obj object.Object) error
Delete(address *object.Address) bool
}
@ -52,8 +52,8 @@ func (o *ObjectHeadersCache) Get(address *object.Address) *object.Object {
}
// Put puts an object to cache.
func (o *ObjectHeadersCache) Put(address *object.Address, obj object.Object) error {
return o.cache.SetWithExpire(address.String(), obj, o.lifetime)
func (o *ObjectHeadersCache) Put(obj object.Object) error {
return o.cache.SetWithExpire(obj.ContainerID().String()+"/"+obj.ID().String(), obj, o.lifetime)
}
// Delete deletes an object from cache.

View file

@ -4,6 +4,7 @@ import (
"testing"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test"
"github.com/stretchr/testify/require"
)
@ -14,23 +15,23 @@ const (
)
func TestCache(t *testing.T) {
var (
address = objecttest.Address()
object = objecttest.Object()
)
obj := objecttest.Object()
address := object.NewAddress()
address.SetContainerID(obj.ContainerID())
address.SetObjectID(obj.ID())
t.Run("check get", func(t *testing.T) {
cache := New(cachesize, lifetime)
err := cache.Put(address, *object)
err := cache.Put(*obj)
require.NoError(t, err)
actual := cache.Get(address)
require.Equal(t, object, actual)
require.Equal(t, obj, actual)
})
t.Run("check delete", func(t *testing.T) {
cache := New(cachesize, lifetime)
err := cache.Put(address, *object)
err := cache.Put(*obj)
require.NoError(t, err)
cache.Delete(address)

57
api/cache/system.go vendored Normal file
View file

@ -0,0 +1,57 @@
package cache
import (
"time"
"github.com/bluele/gcache"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type (
// SystemCache provides interface for lru cache for objects.
// This cache contains "system" objects (bucket versioning settings, tagging object etc.).
// Key is bucketName+systemFileName.
SystemCache interface {
Get(key string) *object.Object
Put(key string, obj *object.Object) error
Delete(key string) bool
}
// SysCache contains cache with objects and lifetime of cache entries.
SysCache struct {
cache gcache.Cache
lifetime time.Duration
}
)
// NewSystemCache creates an object of SystemCache.
func NewSystemCache(cacheSize int, lifetime time.Duration) *SysCache {
gc := gcache.New(cacheSize).LRU().Build()
return &SysCache{cache: gc, lifetime: lifetime}
}
// Get returns cached object.
func (o *SysCache) Get(key string) *object.Object {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*object.Object)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *SysCache) Put(key string, obj *object.Object) error {
return o.cache.SetWithExpire(key, obj, o.lifetime)
}
// Delete deletes an object from cache.
func (o *SysCache) Delete(key string) bool {
return o.cache.Remove(key)
}

View file

@ -66,6 +66,7 @@ const (
ErrNoSuchKey
ErrNoSuchUpload
ErrNoSuchVersion
ErrInvalidVersion
ErrNotImplemented
ErrPreconditionFailed
ErrNotModified
@ -529,6 +530,12 @@ var errorCodes = errorCodeMap{
Description: "Indicates that the version ID specified in the request does not match an existing version.",
HTTPStatusCode: http.StatusNotFound,
},
ErrInvalidVersion: {
ErrCode: ErrInvalidVersion,
Code: "InvalidArgument",
Description: "Invalid version id specified",
HTTPStatusCode: http.StatusBadRequest,
},
ErrNotImplemented: {
ErrCode: ErrNotImplemented,
Code: "NotImplemented",
@ -1917,26 +1924,18 @@ func GetAPIError(code ErrorCode) Error {
return errorCodes.toAPIErr(ErrInternalError)
}
// GenericError - generic object layer error.
type GenericError struct {
Bucket string
Object string
// ObjectError - error that linked to specific object.
type ObjectError struct {
Err error
Object string
Version string
}
// ObjectAlreadyExists object already exists.
// This type should be removed when s3-gw will support versioning.
type ObjectAlreadyExists GenericError
func (e ObjectAlreadyExists) Error() string {
return "Object: " + e.Bucket + "#" + e.Object + " already exists"
func (e ObjectError) Error() string {
return fmt.Sprintf("%s (%s:%s)", e.Err, e.Object, e.Version)
}
// DeleteError - returns when cant remove object.
type DeleteError struct {
Err error
Object string
}
func (e DeleteError) Error() string {
return fmt.Sprintf("%s (%s)", e.Err, e.Object)
// ObjectVersion get "object:version" string.
func (e ObjectError) ObjectVersion() string {
return e.Object + ":" + e.Version
}

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
)
@ -239,7 +240,13 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
return
}
if _, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil {
p := &layer.HeadObjectParams{
Bucket: reqInfo.BucketName,
Object: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
if _, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
h.logAndSendError(w, "could not get object info", reqInfo, err)
return
}
@ -273,7 +280,7 @@ func (h *handler) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request)
}
}
func checkOwner(info *layer.BucketInfo, owner string) error {
func checkOwner(info *cache.BucketInfo, owner string) error {
if owner == "" {
return nil
}

View file

@ -32,10 +32,11 @@ func path2BucketObject(path string) (bucket, prefix string) {
func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
inf *layer.ObjectInfo
info *layer.ObjectInfo
metadata map[string]string
reqInfo = api.GetReqInfo(r.Context())
reqInfo = api.GetReqInfo(r.Context())
versionID string
)
src := r.Header.Get("X-Amz-Copy-Source")
@ -45,14 +46,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
// of the version ID to null. If you have enabled versioning, Amazon S3 assigns a
// unique version ID value for the object.
if u, err := url.Parse(src); err == nil {
// Check if versionId query param was added, if yes then check if
// its non "null" value, we should error out since we do not support
// any versions other than "null".
if vid := u.Query().Get("versionId"); vid != "" && vid != "null" {
h.logAndSendError(w, "no such version", reqInfo, errors.GetAPIError(errors.ErrNoSuchVersion))
return
}
versionID = u.Query().Get(api.QueryVersionID)
src = u.Path
}
@ -63,6 +57,11 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "could not parse request params", reqInfo, err)
return
}
p := &layer.HeadObjectParams{
Bucket: srcBucket,
Object: srcObject,
VersionID: versionID,
}
if args.MetadataDirective == replaceMetadataDirective {
metadata = parseMetadata(r)
@ -80,47 +79,46 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
if inf, err = h.obj.GetObjectInfo(r.Context(), srcBucket, srcObject); err != nil {
if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}
if err = checkPreconditions(inf, args.Conditional); err != nil {
if err = checkPreconditions(info, args.Conditional); err != nil {
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed))
return
}
if metadata == nil {
if len(inf.ContentType) > 0 {
inf.Headers[api.ContentType] = inf.ContentType
if len(info.ContentType) > 0 {
info.Headers[api.ContentType] = info.ContentType
}
metadata = inf.Headers
metadata = info.Headers
} else if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
metadata[api.ContentType] = contentType
}
params := &layer.CopyObjectParams{
SrcBucket: srcBucket,
SrcObject: info,
DstBucket: reqInfo.BucketName,
SrcObject: srcObject,
DstObject: reqInfo.ObjectName,
SrcSize: inf.Size,
SrcSize: info.Size,
Header: metadata,
}
additional := []zap.Field{zap.String("src_bucket_name", srcBucket), zap.String("src_object_name", srcObject)}
if inf, err = h.obj.CopyObject(r.Context(), params); err != nil {
if info, err = h.obj.CopyObject(r.Context(), params); err != nil {
h.logAndSendError(w, "couldn't copy object", reqInfo, err, additional...)
return
} else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: inf.Created.Format(time.RFC3339), ETag: inf.HashSum}); err != nil {
} else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: info.Created.Format(time.RFC3339), ETag: info.HashSum}); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
return
}
h.log.Info("object is copied",
zap.String("bucket", inf.Bucket),
zap.String("object", inf.Name),
zap.Stringer("object_id", inf.ID()))
zap.String("bucket", info.Bucket),
zap.String("object", info.Name),
zap.Stringer("object_id", info.ID()))
}
func parseCopyObjectArgs(headers http.Header) (*copyObjectArgs, error) {

View file

@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
@ -21,13 +22,15 @@ type DeleteObjectsRequest struct {
// ObjectIdentifier carries key name for the object to delete.
type ObjectIdentifier struct {
ObjectName string `xml:"Key"`
VersionID string `xml:"VersionId,omitempty"`
}
// DeleteError structure.
type DeleteError struct {
Code string
Message string
Key string
Code string
Message string
Key string
VersionID string `xml:"versionId,omitempty"`
}
// DeleteObjectsResponse container for multiple object deletes.
@ -43,18 +46,22 @@ type DeleteObjectsResponse struct {
func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := api.GetReqInfo(r.Context())
versionedObject := []*layer.VersionedObject{{
Name: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}}
if err := h.checkBucketOwner(r, reqInfo.BucketName); err != nil {
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
return
}
if err := h.obj.DeleteObject(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil {
if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, versionedObject); len(errs) != 0 && errs[0] != nil {
h.log.Error("could not delete object",
zap.String("request_id", reqInfo.RequestID),
zap.String("bucket_name", reqInfo.BucketName),
zap.String("object_name", reqInfo.ObjectName),
zap.Error(err))
zap.Error(errs[0]))
// Ignore delete errors:
@ -93,11 +100,15 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
return
}
removed := make(map[string]struct{})
toRemove := make([]string, 0, len(requested.Objects))
removed := make(map[string]*layer.VersionedObject)
toRemove := make([]*layer.VersionedObject, 0, len(requested.Objects))
for _, obj := range requested.Objects {
removed[obj.ObjectName] = struct{}{}
toRemove = append(toRemove, obj.ObjectName)
versionedObj := &layer.VersionedObject{
Name: obj.ObjectName,
VersionID: obj.VersionID,
}
toRemove = append(toRemove, versionedObj)
removed[versionedObj.String()] = versionedObj
}
response := &DeleteObjectsResponse{
@ -110,35 +121,45 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
return
}
marshaler := zapcore.ArrayMarshalerFunc(func(encoder zapcore.ArrayEncoder) error {
for _, obj := range toRemove {
encoder.AppendString(obj.String())
}
return nil
})
if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, toRemove); errs != nil && !requested.Quiet {
additional := []zap.Field{
zap.Strings("objects_name", toRemove),
zap.Array("objects", marshaler),
zap.Errors("errors", errs),
}
h.logAndSendError(w, "could not delete objects", reqInfo, nil, additional...)
for _, e := range errs {
if err, ok := e.(*errors.DeleteError); ok {
if err, ok := e.(*errors.ObjectError); ok {
code := "BadRequest"
desc := err.Error()
if s3err, ok := err.Err.(errors.Error); ok {
code = s3err.Code
}
response.Errors = append(response.Errors, DeleteError{
Code: code,
Message: desc,
Key: err.Object,
Code: code,
Message: err.Error(),
Key: err.Object,
VersionID: err.Version,
})
delete(removed, err.Object)
delete(removed, err.ObjectVersion())
}
}
}
for key := range removed {
response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: key})
for _, val := range removed {
response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: val.Name, VersionID: val.VersionID})
}
if err := api.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "could not write response", reqInfo, err, zap.Strings("objects_name", toRemove))
h.logAndSendError(w, "could not write response", reqInfo, err, zap.Array("objects", marshaler))
return
}
}

View file

@ -72,6 +72,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) {
h.Set(api.LastModified, info.Created.UTC().Format(http.TimeFormat))
h.Set(api.ContentLength, strconv.FormatInt(info.Size, 10))
h.Set(api.ETag, info.HashSum)
h.Set(api.AmzVersionID, info.ID().String())
for key, val := range info.Headers {
h[api.MetadataPrefix+key] = []string{val}
@ -81,7 +82,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) {
func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
inf *layer.ObjectInfo
info *layer.ObjectInfo
params *layer.RangeParams
reqInfo = api.GetReqInfo(r.Context())
@ -98,47 +99,53 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
if inf, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil {
p := &layer.HeadObjectParams{
Bucket: reqInfo.BucketName,
Object: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}
if err = checkPreconditions(inf, args.Conditional); err != nil {
if err = checkPreconditions(info, args.Conditional); err != nil {
h.logAndSendError(w, "precondition failed", reqInfo, err)
return
}
if params, err = fetchRangeHeader(r.Header, uint64(inf.Size)); err != nil {
if params, err = fetchRangeHeader(r.Header, uint64(info.Size)); err != nil {
h.logAndSendError(w, "could not parse range header", reqInfo, err)
return
}
writeHeaders(w.Header(), inf)
writeHeaders(w.Header(), info)
if params != nil {
writeRangeHeaders(w, params, inf.Size)
writeRangeHeaders(w, params, info.Size)
}
getParams := &layer.GetObjectParams{
Bucket: inf.Bucket,
Object: inf.Name,
Writer: w,
Range: params,
ObjectInfo: info,
Writer: w,
Range: params,
VersionID: p.VersionID,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
}
}
func checkPreconditions(inf *layer.ObjectInfo, args *conditionalArgs) error {
if len(args.IfMatch) > 0 && args.IfMatch != inf.HashSum {
func checkPreconditions(info *layer.ObjectInfo, args *conditionalArgs) error {
if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum {
return errors.GetAPIError(errors.ErrPreconditionFailed)
}
if len(args.IfNoneMatch) > 0 && args.IfNoneMatch == inf.HashSum {
if len(args.IfNoneMatch) > 0 && args.IfNoneMatch == info.HashSum {
return errors.GetAPIError(errors.ErrNotModified)
}
if args.IfModifiedSince != nil && inf.Created.Before(*args.IfModifiedSince) {
if args.IfModifiedSince != nil && info.Created.Before(*args.IfModifiedSince) {
return errors.GetAPIError(errors.ErrNotModified)
}
if args.IfUnmodifiedSince != nil && inf.Created.After(*args.IfUnmodifiedSince) {
if args.IfUnmodifiedSince != nil && info.Created.After(*args.IfUnmodifiedSince) {
if len(args.IfMatch) == 0 {
return errors.GetAPIError(errors.ErrPreconditionFailed)
}

View file

@ -25,8 +25,8 @@ func getRangeToDetectContentType(maxSize int64) *layer.RangeParams {
func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
inf *layer.ObjectInfo
err error
info *layer.ObjectInfo
reqInfo = api.GetReqInfo(r.Context())
)
@ -36,23 +36,33 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
if inf, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil {
p := &layer.HeadObjectParams{
Bucket: reqInfo.BucketName,
Object: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
if info, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
h.logAndSendError(w, "could not fetch object info", reqInfo, err)
return
}
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
getParams := &layer.GetObjectParams{
Bucket: inf.Bucket,
Object: inf.Name,
Writer: buffer,
Range: getRangeToDetectContentType(inf.Size),
if len(info.ContentType) == 0 {
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: buffer,
Range: getRangeToDetectContentType(info.Size),
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID()))
return
}
info.ContentType = http.DetectContentType(buffer.Bytes())
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", inf.ID()))
return
}
inf.ContentType = http.DetectContentType(buffer.Bytes())
writeHeaders(w.Header(), inf)
writeHeaders(w.Header(), info)
w.WriteHeader(http.StatusOK)
}

View file

@ -9,13 +9,6 @@ import (
"github.com/nspcc-dev/neofs-s3-gw/api"
)
// VersioningConfiguration contains VersioningConfiguration XML representation.
type VersioningConfiguration struct {
XMLName xml.Name `xml:"VersioningConfiguration"`
Text string `xml:",chardata"`
Xmlns string `xml:"xmlns,attr"`
}
// ListMultipartUploadsResult contains ListMultipartUploadsResult XML representation.
type ListMultipartUploadsResult struct {
XMLName xml.Name `xml:"ListMultipartUploadsResult"`
@ -62,20 +55,6 @@ func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
}
}
// GetBucketVersioningHandler implements bucket versioning getter handler.
func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
var (
reqInfo = api.GetReqInfo(r.Context())
res = new(VersioningConfiguration)
)
res.Xmlns = "http://s3.amazonaws.com/doc/2006-03-01/"
if err := api.EncodeToResponse(w, res); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
}
}
// ListMultipartUploadsHandler implements multipart uploads listing handler.
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
var (

View file

@ -23,10 +23,6 @@ func (h *handler) PutBucketTaggingHandler(w http.ResponseWriter, r *http.Request
h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported))
}
func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported))
}
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported))
}

View file

@ -217,6 +217,16 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
return
}
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
return
}
info, err := h.obj.ListObjectVersions(r.Context(), p)
if err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
@ -263,7 +273,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
}
for _, prefix := range info.CommonPrefixes {
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: *prefix})
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: prefix})
}
for _, ver := range info.Version {
@ -276,7 +286,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
DisplayName: ver.Object.Owner.String(),
},
Size: ver.Object.Size,
VersionID: ver.VersionID,
VersionID: ver.Object.Version(),
ETag: ver.Object.HashSum,
})
}
@ -284,13 +294,13 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
for _, del := range info.DeleteMarker {
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
IsLatest: del.IsLatest,
Key: del.Key,
LastModified: del.LastModified,
Key: del.Object.Name,
LastModified: del.Object.Created.Format(time.RFC3339),
Owner: Owner{
ID: del.Owner.String(),
DisplayName: del.Owner.String(),
ID: del.Object.Owner.String(),
DisplayName: del.Object.Owner.String(),
},
VersionID: del.VersionID,
VersionID: del.Object.Version(),
})
}

View file

@ -114,6 +114,12 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}
}
if versioning, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName); err != nil {
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
} else if versioning.VersioningEnabled {
w.Header().Set(api.AmzVersionID, info.Version())
}
w.Header().Set(api.ETag, info.HashSum)
api.WriteSuccessResponseHeadersOnly(w)
}

View file

@ -164,6 +164,13 @@ type ListObjectsVersionsResponse struct {
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
}
// VersioningConfiguration contains VersioningConfiguration XML representation.
type VersioningConfiguration struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ VersioningConfiguration"`
Status string `xml:"Status,omitempty"`
MfaDelete string `xml:"MfaDelete,omitempty"`
}
// MarshalXML - StringMap marshals into XML.
func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
tokens := []xml.Token{start}

85
api/handler/versioning.go Normal file
View file

@ -0,0 +1,85 @@
package handler
import (
"encoding/xml"
"net/http"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap"
)
func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := api.GetReqInfo(r.Context())
configuration := new(VersioningConfiguration)
if err := xml.NewDecoder(r.Body).Decode(configuration); err != nil {
h.logAndSendError(w, "couldn't decode versioning configuration", reqInfo, errors.GetAPIError(errors.ErrIllegalVersioningConfigurationException))
return
}
p := &layer.PutVersioningParams{
Bucket: reqInfo.BucketName,
Settings: &layer.BucketSettings{VersioningEnabled: configuration.Status == "Enabled"},
}
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
return
}
if _, err := h.obj.PutBucketVersioning(r.Context(), p); err != nil {
h.logAndSendError(w, "couldn't put update versioning settings", reqInfo, err)
}
}
// GetBucketVersioningHandler implements bucket versioning getter handler.
func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := api.GetReqInfo(r.Context())
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
return
}
settings, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName)
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchBucket) {
h.logAndSendError(w, "couldn't get versioning settings", reqInfo, err)
return
}
h.log.Warn("couldn't get version settings object: default version settings will be used",
zap.String("request_id", reqInfo.RequestID),
zap.String("method", reqInfo.API),
zap.String("bucket_name", reqInfo.BucketName),
zap.Error(err))
}
if err = api.EncodeToResponse(w, formVersioningConfiguration(settings)); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
}
}
func formVersioningConfiguration(settings *layer.BucketSettings) *VersioningConfiguration {
res := &VersioningConfiguration{}
if settings == nil {
return res
}
if settings.VersioningEnabled {
res.Status = "Enabled"
} else {
res.Status = "Suspended"
}
return res
}

View file

@ -4,6 +4,7 @@ package api
const (
MetadataPrefix = "X-Amz-Meta-"
AmzMetadataDirective = "X-Amz-Metadata-Directive"
AmzVersionID = "X-Amz-Version-Id"
LastModified = "Last-Modified"
Date = "Date"
@ -43,3 +44,8 @@ const (
ContainerID = "X-Container-Id"
)
// S3 request query params.
const (
QueryVersionID = "versionId"
)

View file

@ -11,37 +11,29 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-sdk-go/pkg/pool"
"go.uber.org/zap"
)
type (
// BucketInfo stores basic bucket data.
BucketInfo struct {
Name string
CID *cid.ID
Owner *owner.ID
Created time.Time
BasicACL uint32
}
// BucketACL extends BucketInfo by eacl.Table.
BucketACL struct {
Info *BucketInfo
Info *cache.BucketInfo
EACL *eacl.Table
}
)
func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, error) {
func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*cache.BucketInfo, error) {
var (
err error
res *container.Container
rid = api.GetRequestID(ctx)
bearerOpt = n.BearerOpt(ctx)
info = &BucketInfo{
info = &cache.BucketInfo{
CID: cid,
Name: cid.String(),
}
@ -82,10 +74,17 @@ func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, er
}
}
if err := n.bucketCache.Put(info); err != nil {
n.log.Warn("could not put bucket info into cache",
zap.Stringer("cid", cid),
zap.String("bucket_name", info.Name),
zap.Error(err))
}
return info, nil
}
func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) {
func (n *layer) containerList(ctx context.Context) ([]*cache.BucketInfo, error) {
var (
err error
own = n.Owner(ctx)
@ -101,7 +100,7 @@ func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) {
return nil, err
}
list := make([]*BucketInfo, 0, len(res))
list := make([]*cache.BucketInfo, 0, len(res))
for _, cid := range res {
info, err := n.containerInfo(ctx, cid)
if err != nil {
@ -118,29 +117,42 @@ func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) {
}
func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*cid.ID, error) {
var err error
bktInfo := &cache.BucketInfo{
Name: p.Name,
Owner: n.Owner(ctx),
Created: time.Now(),
BasicACL: p.ACL,
}
cnr := container.New(
container.WithPolicy(p.Policy),
container.WithCustomBasicACL(p.ACL),
container.WithAttribute(container.AttributeName, p.Name),
container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)))
container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(bktInfo.Created.Unix(), 10)))
cnr.SetSessionToken(p.BoxData.Gate.SessionToken)
cnr.SetOwnerID(n.Owner(ctx))
cnr.SetOwnerID(bktInfo.Owner)
cid, err := n.pool.PutContainer(ctx, cnr)
if err != nil {
return nil, fmt.Errorf("failed to create a bucket: %w", err)
}
if err = n.pool.WaitForContainerPresence(ctx, cid, pool.DefaultPollingParams()); err != nil {
if bktInfo.CID, err = n.pool.PutContainer(ctx, cnr); err != nil {
return nil, err
}
if err := n.setContainerEACLTable(ctx, cid, p.EACL); err != nil {
if err = n.pool.WaitForContainerPresence(ctx, bktInfo.CID, pool.DefaultPollingParams()); err != nil {
return nil, err
}
return cid, nil
if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL); err != nil {
return nil, err
}
if err = n.bucketCache.Put(bktInfo); err != nil {
n.log.Warn("couldn't put bucket info into cache",
zap.String("bucket name", bktInfo.Name),
zap.Stringer("bucket cid", bktInfo.CID),
zap.Error(err))
}
return bktInfo.CID, nil
}
func (n *layer) setContainerEACLTable(ctx context.Context, cid *cid.ID, table *eacl.Table) error {

View file

@ -3,24 +3,56 @@ package layer
import (
"io"
"net/http"
"sync"
)
type detector struct {
io.Reader
sync.Once
type (
detector struct {
io.Reader
err error
data []byte
}
errReader struct {
data []byte
err error
offset int
}
)
contentType string
const contentTypeDetectSize = 512
func newReader(data []byte, err error) *errReader {
return &errReader{data: data, err: err}
}
func newDetector(r io.Reader) *detector {
return &detector{Reader: r}
func (r *errReader) Read(b []byte) (int, error) {
if r.offset >= len(r.data) {
return 0, io.EOF
}
n := copy(b, r.data[r.offset:])
r.offset += n
if r.offset >= len(r.data) {
return n, r.err
}
return n, nil
}
func (d *detector) Read(data []byte) (int, error) {
d.Do(func() {
d.contentType = http.DetectContentType(data)
})
return d.Reader.Read(data)
func newDetector(reader io.Reader) *detector {
return &detector{
data: make([]byte, contentTypeDetectSize),
Reader: reader,
}
}
func (d *detector) Detect() (string, error) {
n, err := d.Reader.Read(d.data)
if err != nil && err != io.EOF {
d.err = err
return "", err
}
d.data = d.data[:n]
return http.DetectContentType(d.data), nil
}
func (d *detector) MultiReader() io.Reader {
return io.MultiReader(newReader(d.data, d.err), d.Reader)
}

View file

@ -1,12 +1,12 @@
package layer
import (
"bytes"
"context"
"crypto/ecdsa"
"fmt"
"io"
"net/url"
"sort"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
@ -25,10 +25,13 @@ import (
type (
layer struct {
pool pool.Pool
log *zap.Logger
listObjCache ObjectsListCache
objCache cache.ObjectsCache
pool pool.Pool
log *zap.Logger
listsCache ObjectsListCache
objCache cache.ObjectsCache
namesCache cache.ObjectsNameCache
bucketCache cache.BucketCache
systemCache cache.SystemCache
}
// CacheConfig contains params for caches.
@ -48,12 +51,19 @@ type (
// GetObjectParams stores object get request parameters.
GetObjectParams struct {
Range *RangeParams
Bucket string
Object string
Offset int64
Length int64
Writer io.Writer
Range *RangeParams
ObjectInfo *ObjectInfo
Offset int64
Length int64
Writer io.Writer
VersionID string
}
// HeadObjectParams stores object head request parameters.
HeadObjectParams struct {
Bucket string
Object string
VersionID string
}
// RangeParams stores range header request parameters.
@ -71,11 +81,21 @@ type (
Header map[string]string
}
// PutVersioningParams stores object copy request parameters.
PutVersioningParams struct {
Bucket string
Settings *BucketSettings
}
// BucketSettings stores settings such as versioning.
BucketSettings struct {
VersioningEnabled bool
}
// CopyObjectParams stores object copy request parameters.
CopyObjectParams struct {
SrcBucket string
SrcObject *ObjectInfo
DstBucket string
SrcObject string
DstObject string
SrcSize int64
Header map[string]string
@ -108,6 +128,12 @@ type (
Encode string
}
// VersionedObject stores object name and version.
VersionedObject struct {
Name string
VersionID string
}
// NeoFS provides basic NeoFS interface.
NeoFS interface {
Get(ctx context.Context, address *object.Address) (*object.Object, error)
@ -117,15 +143,18 @@ type (
Client interface {
NeoFS
ListBuckets(ctx context.Context) ([]*BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error)
GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error)
ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error)
GetBucketACL(ctx context.Context, name string) (*BucketACL, error)
PutBucketACL(ctx context.Context, p *PutBucketACLParams) error
CreateBucket(ctx context.Context, p *CreateBucketParams) (*cid.ID, error)
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
GetObject(ctx context.Context, p *GetObjectParams) error
GetObjectInfo(ctx context.Context, bucketName, objectName string) (*ObjectInfo, error)
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error)
PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error)
@ -135,23 +164,26 @@ type (
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
DeleteObject(ctx context.Context, bucket, object string) error
DeleteObjects(ctx context.Context, bucket string, objects []string) []error
DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error
}
)
const (
unversionedObjectVersionID = "null"
)
func (t *VersionedObject) String() string {
return t.Name + ":" + t.VersionID
}
// NewLayer creates instance of layer. It checks credentials
// and establishes gRPC connection with node.
func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client {
return &layer{
pool: conns,
log: log,
listObjCache: newListObjectsCache(config.ListObjectsLifetime),
objCache: cache.New(config.Size, config.Lifetime),
pool: conns,
log: log,
listsCache: newListObjectsCache(config.ListObjectsLifetime),
objCache: cache.New(config.Size, config.Lifetime),
//todo reconsider cache params
namesCache: cache.NewObjectsNameCache(1000, time.Minute),
bucketCache: cache.NewBucketCache(150, time.Minute),
systemCache: cache.NewSystemCache(1000, 5*time.Minute),
}
}
@ -189,12 +221,16 @@ func (n *layer) Get(ctx context.Context, address *object.Address) (*object.Objec
}
// GetBucketInfo returns bucket info by name.
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) {
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error) {
name, err := url.QueryUnescape(name)
if err != nil {
return nil, err
}
if bktInfo := n.bucketCache.Get(name); bktInfo != nil {
return bktInfo, nil
}
containerID := new(cid.ID)
if err := containerID.Parse(name); err != nil {
list, err := n.containerList(ctx)
@ -243,33 +279,20 @@ func (n *layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) err
// ListBuckets returns all user containers. Name of the bucket is a container
// id. Timestamp is omitted since it is not saved in neofs container.
func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) {
func (n *layer) ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error) {
return n.containerList(ctx)
}
// GetObject from storage.
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
var (
err error
oid *object.ID
bkt *BucketInfo
)
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return fmt.Errorf("couldn't find bucket: %s : %w", p.Bucket, err)
} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil {
return fmt.Errorf("search of the object failed: cid: %s, val: %s : %w", bkt.CID, p.Object, err)
}
addr := object.NewAddress()
addr.SetObjectID(oid)
addr.SetContainerID(bkt.CID)
var err error
params := &getParams{
Writer: p.Writer,
address: addr,
offset: p.Offset,
length: p.Length,
Writer: p.Writer,
cid: p.ObjectInfo.CID(),
oid: p.ObjectInfo.ID(),
offset: p.Offset,
length: p.Length,
}
if p.Range != nil {
@ -284,64 +307,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
}
if err != nil {
n.objCache.Delete(addr)
return fmt.Errorf("couldn't get object, cid: %s : %w", bkt.CID, err)
n.objCache.Delete(p.ObjectInfo.Address())
return fmt.Errorf("couldn't get object, cid: %s : %w", p.ObjectInfo.CID(), err)
}
return nil
}
func (n *layer) checkObject(ctx context.Context, cid *cid.ID, filename string) error {
var err error
if _, err = n.objectFindID(ctx, &findParams{cid: cid, val: filename}); err == nil {
return new(errors.ObjectAlreadyExists)
}
return err
}
// GetObjectInfo returns meta information about the object.
func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string) (*ObjectInfo, error) {
var (
err error
oid *object.ID
bkt *BucketInfo
meta *object.Object
)
if bkt, err = n.GetBucketInfo(ctx, bucketName); err != nil {
func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error) {
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil {
n.log.Error("could not fetch bucket info", zap.Error(err))
return nil, err
} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil {
n.log.Error("could not find object id", zap.Error(err))
}
if len(p.VersionID) == 0 {
return n.headLastVersionIfNotDeleted(ctx, bkt, p.Object)
}
return n.headVersion(ctx, bkt, p.VersionID)
}
func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *cache.BucketInfo) (*ObjectInfo, error) {
if meta := n.systemCache.Get(bkt.SettingsObjectKey()); meta != nil {
return objInfoFromMeta(bkt, meta), nil
}
oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: bkt.SettingsObjectName()})
if err != nil {
return nil, err
}
addr := object.NewAddress()
addr.SetObjectID(oid)
addr.SetContainerID(bkt.CID)
/* todo: now we get an address via request to NeoFS and try to find the object with the address in cache
but it will be resolved after implementation of local cache with nicenames and address of objects
for get/head requests */
meta = n.objCache.Get(addr)
if meta == nil {
meta, err = n.objectHead(ctx, addr)
if err != nil {
n.log.Error("could not fetch object head", zap.Error(err))
return nil, err
}
if err = n.objCache.Put(addr, *meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
}
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
n.log.Error("could not fetch object head", zap.Error(err))
return nil, err
}
return objectInfoFromMeta(bkt, meta, "", ""), nil
if err = n.systemCache.Put(bkt.SettingsObjectKey(), meta); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
return objInfoFromMeta(bkt, meta), nil
}
// PutObject into storage.
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) {
return n.objectPut(ctx, p)
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil {
return nil, err
}
return n.objectPut(ctx, bkt, p)
}
// CopyObject from one bucket into another bucket.
@ -350,9 +367,8 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf
go func() {
err := n.GetObject(ctx, &GetObjectParams{
Bucket: p.SrcBucket,
Object: p.SrcObject,
Writer: pw,
ObjectInfo: p.SrcObject,
Writer: pw,
})
if err = pw.CloseWithError(err); err != nil {
@ -370,35 +386,47 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf
}
// DeleteObject removes all objects with passed nice name.
func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error {
func (n *layer) deleteObject(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) error {
var (
err error
ids []*object.ID
bkt *BucketInfo
)
if bkt, err = n.GetBucketInfo(ctx, bucket); err != nil {
return &errors.DeleteError{
Err: err,
Object: filename,
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
if !versioningEnabled && obj.VersionID != unversionedObjectVersionID && obj.VersionID != "" {
return errors.GetAPIError(errors.ErrInvalidVersion)
}
if versioningEnabled {
p := &PutObjectParams{
Object: obj.Name,
Reader: bytes.NewReader(nil),
Header: map[string]string{versionsDeleteMarkAttr: obj.VersionID},
}
} else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil {
return &errors.DeleteError{
Err: err,
Object: filename,
if len(obj.VersionID) != 0 {
id, err := n.checkVersionsExist(ctx, bkt, obj)
if err != nil {
return err
}
ids = []*object.ID{id}
p.Header[versionsDelAttr] = obj.VersionID
} else {
p.Header[versionsDeleteMarkAttr] = delMarkFullObject
}
if _, err = n.objectPut(ctx, bkt, p); err != nil {
return err
}
} else {
ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: obj.Name})
if err != nil {
return err
}
}
for _, id := range ids {
addr := object.NewAddress()
addr.SetObjectID(id)
addr.SetContainerID(bkt.CID)
if err = n.objectDelete(ctx, addr); err != nil {
return &errors.DeleteError{
Err: err,
Object: filename,
}
if err = n.objectDelete(ctx, bkt.CID, id); err != nil {
return err
}
}
@ -406,12 +434,17 @@ func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []string) []error {
func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error {
var errs = make([]error, 0, len(objects))
for i := range objects {
if err := n.DeleteObject(ctx, bucket, objects[i]); err != nil {
errs = append(errs, err)
bkt, err := n.GetBucketInfo(ctx, bucket)
if err != nil {
return append(errs, err)
}
for _, obj := range objects {
if err := n.deleteObject(ctx, bkt, obj); err != nil {
errs = append(errs, &errors.ObjectError{Err: err, Object: obj.Name, Version: obj.VersionID})
}
}
@ -436,75 +469,17 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
return err
}
return n.deleteContainer(ctx, bucketInfo.CID)
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var (
res = ListObjectVersionsInfo{}
err error
bkt *BucketInfo
ids []*object.ID
uniqNames = make(map[string]bool)
)
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return nil, err
} else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID}); err != nil {
return nil, err
}
versions := make([]*ObjectVersionInfo, 0, len(ids))
// todo: deletemarkers is empty now, we will use it after proper realization of versioning
deleted := make([]*DeletedObjectInfo, 0, len(ids))
res.DeleteMarker = deleted
for _, id := range ids {
addr := object.NewAddress()
addr.SetObjectID(id)
addr.SetContainerID(bkt.CID)
meta, err := n.objectHead(ctx, addr)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if ov := objectVersionInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); ov != nil {
if _, ok := uniqNames[ov.Object.Name]; ok {
continue
}
if len(p.KeyMarker) > 0 && ov.Object.Name <= p.KeyMarker {
continue
}
uniqNames[ov.Object.Name] = ov.Object.isDir
versions = append(versions, ov)
}
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].Object.Name < versions[j].Object.Name
})
if len(versions) > p.MaxKeys {
res.IsTruncated = true
lastVersion := versions[p.MaxKeys-1]
res.KeyMarker = lastVersion.Object.Name
res.VersionIDMarker = lastVersion.VersionID
nextVersion := versions[p.MaxKeys]
res.NextKeyMarker = nextVersion.Object.Name
res.NextVersionIDMarker = nextVersion.VersionID
versions = versions[:p.MaxKeys]
}
for _, ov := range versions {
if isDir := uniqNames[ov.Object.Name]; isDir {
res.CommonPrefixes = append(res.CommonPrefixes, &ov.Object.Name)
} else {
res.Version = append(res.Version, ov)
}
}
return &res, nil
objects, err := n.listSortedObjectsFromNeoFS(ctx, allObjectParams{Bucket: bucketInfo})
if err != nil {
return err
}
if len(objects) != 0 {
return errors.GetAPIError(errors.ErrBucketNotEmpty)
}
if err = n.deleteContainer(ctx, bucketInfo.CID); err != nil {
return err
}
n.bucketCache.Delete(bucketInfo.Name)
return nil
}

View file

@ -7,28 +7,34 @@ import (
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"go.uber.org/zap"
)
type (
findParams struct {
val string
cid *cid.ID
attr string
val string
cid *cid.ID
}
getParams struct {
io.Writer
*object.Range
offset int64
length int64
address *object.Address
offset int64
length int64
cid *cid.ID
oid *object.ID
}
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
@ -55,7 +61,7 @@ type (
}
allObjectParams struct {
Bucket *BucketInfo
Bucket *cache.BucketInfo
Delimiter string
Prefix string
}
@ -70,7 +76,11 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID,
if filename, err := url.QueryUnescape(p.val); err != nil {
return nil, err
} else if filename != "" {
opts.AddFilter(object.AttributeFileName, filename, object.MatchStringEqual)
if p.attr == "" {
opts.AddFilter(object.AttributeFileName, filename, object.MatchStringEqual)
} else {
opts.AddFilter(p.attr, filename, object.MatchStringEqual)
}
}
return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx))
}
@ -89,9 +99,16 @@ func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, er
return nil, errors.New("several objects with the same name found")
}
func newAddress(cid *cid.ID, oid *object.ID) *object.Address {
address := object.NewAddress()
address.SetContainerID(cid)
address.SetObjectID(oid)
return address
}
// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, address *object.Address) (*object.Object, error) {
ops := new(client.ObjectHeaderParams).WithAddress(address).WithAllFields()
func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) {
ops := new(client.ObjectHeaderParams).WithAddress(newAddress(cid, oid)).WithAllFields()
return n.pool.GetObjectHeader(ctx, ops, n.BearerOpt(ctx))
}
@ -99,54 +116,96 @@ func (n *layer) objectHead(ctx context.Context, address *object.Address) (*objec
func (n *layer) objectGet(ctx context.Context, p *getParams) (*object.Object, error) {
// prepare length/offset writer
w := newWriter(p.Writer, p.offset, p.length)
ops := new(client.GetObjectParams).WithAddress(p.address).WithPayloadWriter(w)
ops := new(client.GetObjectParams).WithAddress(newAddress(p.cid, p.oid)).WithPayloadWriter(w)
return n.pool.GetObject(ctx, ops, n.BearerOpt(ctx))
}
// objectRange gets object range and writes it into provided io.Writer.
func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) {
w := newWriter(p.Writer, p.offset, p.length)
ops := new(client.RangeDataParams).WithAddress(p.address).WithDataWriter(w).WithRange(p.Range)
ops := new(client.RangeDataParams).WithAddress(newAddress(p.cid, p.oid)).WithDataWriter(w).WithRange(p.Range)
return n.pool.ObjectPayloadRangeData(ctx, ops, n.BearerOpt(ctx))
}
// objectPut into NeoFS, took payload from io.Reader.
func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) {
var (
err error
obj string
bkt *BucketInfo
own = n.Owner(ctx)
)
func (n *layer) objectPut(ctx context.Context, bkt *cache.BucketInfo, p *PutObjectParams) (*ObjectInfo, error) {
own := n.Owner(ctx)
obj, err := url.QueryUnescape(p.Object)
if err != nil {
return nil, err
}
if obj, err = url.QueryUnescape(p.Object); err != nil {
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
versions, err := n.headVersions(ctx, bkt, obj)
if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) {
return nil, err
} else if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return nil, err
} else if err = n.checkObject(ctx, bkt.CID, p.Object); err != nil {
var errExist *apiErrors.ObjectAlreadyExists
if ok := errors.As(err, &errExist); ok {
errExist.Bucket = p.Bucket
errExist.Object = p.Object
return nil, errExist
}
idsToDeleteArr := updateCRDT2PSetHeaders(p, versions, versioningEnabled)
r := p.Reader
if len(p.Header[api.ContentType]) == 0 {
d := newDetector(r)
if contentType, err := d.Detect(); err == nil {
p.Header[api.ContentType] = contentType
}
r = d.MultiReader()
}
rawObject := formRawObject(p, bkt.CID, own, obj)
if !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) {
return nil, err
ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r)
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
if err != nil {
return nil, err
}
if p.Header[versionsDeleteMarkAttr] == delMarkFullObject {
if last := versions.getLast(); last != nil {
n.objCache.Delete(last.Address())
}
}
attributes := make([]*object.Attribute, 0, len(p.Header)+1)
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
return nil, err
}
unix := strconv.FormatInt(time.Now().UTC().Unix(), 10)
if err = n.objCache.Put(*meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
}
for _, id := range idsToDeleteArr {
if err = n.objectDelete(ctx, bkt.CID, id); err != nil {
n.log.Warn("couldn't delete object",
zap.Stringer("version id", id),
zap.Error(err))
}
}
return &ObjectInfo{
id: oid,
bucketID: bkt.CID,
Owner: own,
Bucket: p.Bucket,
Name: p.Object,
Size: p.Size,
Created: time.Now(),
CreationEpoch: meta.CreationEpoch(),
Headers: p.Header,
ContentType: p.Header[api.ContentType],
HashSum: meta.PayloadChecksum().String(),
}, nil
}
func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string) *object.RawObject {
attributes := make([]*object.Attribute, 0, len(p.Header)+2)
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(obj)
createdAt := object.NewAttribute()
createdAt.SetKey(object.AttributeTimestamp)
createdAt.SetValue(unix)
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
attributes = append(attributes, filename, createdAt)
@ -160,49 +219,155 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
raw := object.NewRaw()
raw.SetOwnerID(own)
raw.SetContainerID(bkt.CID)
raw.SetContainerID(bktID)
raw.SetAttributes(attributes...)
r := newDetector(p.Reader)
return raw
}
ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(r)
oid, err := n.pool.PutObject(
ctx,
ops,
n.BearerOpt(ctx),
)
func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versioningEnabled bool) []*object.ID {
var idsToDeleteArr []*object.ID
if versions == nil {
return idsToDeleteArr
}
if versioningEnabled {
if len(versions.addList) != 0 {
p.Header[versionsAddAttr] = versions.getAddHeader()
}
deleted := versions.getDelHeader()
// p.Header[versionsDelAttr] can be not empty when deleting specific version
if delAttr := p.Header[versionsDelAttr]; len(delAttr) != 0 {
if len(deleted) != 0 {
p.Header[versionsDelAttr] = deleted + "," + delAttr
} else {
p.Header[versionsDelAttr] = delAttr
}
} else if len(deleted) != 0 {
p.Header[versionsDelAttr] = deleted
}
} else {
versionsDeletedStr := versions.getDelHeader()
if len(versionsDeletedStr) != 0 {
versionsDeletedStr += ","
}
if lastVersion := versions.getLast(); lastVersion != nil {
p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version()
idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID())
} else if len(versionsDeletedStr) != 0 {
p.Header[versionsDelAttr] = versionsDeletedStr
}
for _, version := range versions.objects {
if contains(versions.delList, version.Version()) {
idsToDeleteArr = append(idsToDeleteArr, version.ID())
}
}
}
return idsToDeleteArr
}
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*ObjectInfo, error) {
if address := n.namesCache.Get(bkt.Name + "/" + objectName); address != nil {
if headInfo := n.objCache.Get(address); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
}
versions, err := n.headVersions(ctx, bkt, objectName)
if err != nil {
return nil, err
}
addr := object.NewAddress()
addr.SetObjectID(oid)
addr.SetContainerID(bkt.CID)
meta, err := n.objectHead(ctx, addr)
lastVersion := versions.getLast()
if lastVersion == nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
if err = n.namesCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil {
n.log.Warn("couldn't put obj address to head cache",
zap.String("obj nice name", lastVersion.NiceName()),
zap.Error(err))
}
return lastVersion, nil
}
func (n *layer) headVersions(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName})
if err != nil {
return nil, err
}
if err = n.objCache.Put(addr, *meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
if len(ids) == 0 {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
return &ObjectInfo{
id: oid,
versions := newObjectVersions(objectName)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
n.log.Warn("couldn't head object",
zap.Stringer("object id", id),
zap.Stringer("bucket id", bkt.CID),
zap.Error(err))
continue
}
if err = n.objCache.Put(*meta); err != nil {
n.log.Warn("couldn't put meta to objects cache",
zap.Stringer("object id", id),
zap.Stringer("bucket id", bkt.CID),
zap.Error(err))
}
Owner: own,
Bucket: p.Bucket,
Name: p.Object,
Size: p.Size,
Created: time.Now(),
Headers: p.Header,
ContentType: r.contentType,
HashSum: meta.PayloadChecksum().String(),
}, nil
if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil {
if isSystem(oi) {
continue
}
versions.appendVersion(oi)
}
}
return versions, nil
}
func (n *layer) headVersion(ctx context.Context, bkt *cache.BucketInfo, versionID string) (*ObjectInfo, error) {
oid := object.NewID()
if err := oid.Parse(versionID); err != nil {
return nil, err
}
if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
return nil, err
}
objInfo := objectInfoFromMeta(bkt, meta, "", "")
if err = n.objCache.Put(*meta); err != nil {
n.log.Warn("couldn't put obj to object cache",
zap.String("bucket name", objInfo.Bucket),
zap.Stringer("bucket cid", objInfo.CID()),
zap.String("object name", objInfo.Name),
zap.Stringer("object id", objInfo.ID()),
zap.Error(err))
}
return objInfo, nil
}
// objectDelete puts tombstone object into neofs.
func (n *layer) objectDelete(ctx context.Context, address *object.Address) error {
func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error {
address := newAddress(cid, oid)
dop := new(client.DeleteObjectParams)
dop.WithAddress(address)
n.objCache.Delete(address)
@ -284,37 +449,16 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
}
func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
var (
err error
ids []*object.ID
uniqNames = make(map[string]bool)
)
if ids, err = n.objectSearch(ctx, &findParams{cid: p.Bucket.CID}); err != nil {
versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
objects := make([]*ObjectInfo, 0, len(ids))
for _, id := range ids {
addr := object.NewAddress()
addr.SetObjectID(id)
addr.SetContainerID(p.Bucket.CID)
meta, err := n.objectHead(ctx, addr)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if oi := objectInfoFromMeta(p.Bucket, meta, p.Prefix, p.Delimiter); oi != nil {
// use only unique dir names
if _, ok := uniqNames[oi.Name]; ok {
continue
}
uniqNames[oi.Name] = oi.isDir
objects = append(objects, oi)
objects := make([]*ObjectInfo, 0, len(versions))
for _, v := range versions {
lastVersion := v.getLast()
if lastVersion != nil {
objects = append(objects, lastVersion)
}
}
@ -325,6 +469,59 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam
return objects, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *cache.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID})
if err != nil {
return nil, err
}
versions := make(map[string]*objectVersions, len(ids)/2)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if oi := objectInfoFromMeta(bkt, meta, prefix, delimiter); oi != nil {
if isSystem(oi) {
continue
}
objVersions, ok := versions[oi.Name]
if !ok {
objVersions = newObjectVersions(oi.Name)
}
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
}
}
return versions, nil
}
func getExistedVersions(versions *objectVersions) []string {
var res []string
for _, add := range versions.addList {
if !contains(versions.delList, add) {
res = append(res, add)
}
}
return res
}
func splitVersions(header string) []string {
if len(header) == 0 {
return nil
}
return strings.Split(header, ",")
}
func isSystem(obj *ObjectInfo) bool {
return len(obj.Headers[objectSystemAttributeName]) > 0 ||
len(obj.Headers[attrVersionsIgnore]) > 0
}
func trimAfterObjectName(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter {
return nil
@ -366,7 +563,7 @@ func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*Obje
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) {
var (
err error
bkt *BucketInfo
bkt *cache.BucketInfo
cacheKey cacheOptions
allObjects []*ObjectInfo
)
@ -375,11 +572,11 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) (
return nil, err
}
if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil {
if cacheKey, err = createKey(ctx, bkt.CID, listObjectsMethod, p.Prefix, p.Delimiter); err != nil {
return nil, err
}
allObjects = n.listObjCache.Get(cacheKey)
allObjects = n.listsCache.Get(cacheKey)
if allObjects == nil {
allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{
@ -392,8 +589,18 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) (
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.listObjCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
}
return allObjects, nil
}
func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *cache.BucketInfo) bool {
settings, err := n.getBucketSettings(ctx, bktInfo)
if err != nil {
n.log.Warn("couldn't get versioning settings object", zap.Error(err))
return false
}
return settings.VersioningEnabled
}

View file

@ -30,6 +30,11 @@ type (
// DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects.
const DefaultObjectsListCacheLifetime = time.Second * 60
const (
listObjectsMethod = "listObjects"
listVersionsMethod = "listVersions"
)
type (
listObjectsCache struct {
cacheLifetime time.Duration
@ -40,6 +45,7 @@ type (
list []*ObjectInfo
}
cacheOptions struct {
method string
key string
delimiter string
prefix string
@ -78,12 +84,13 @@ func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) {
})
}
func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) {
func createKey(ctx context.Context, cid *cid.ID, method, prefix, delimiter string) (cacheOptions, error) {
box, err := GetBoxData(ctx)
if err != nil {
return cacheOptions{}, err
}
p := cacheOptions{
method: method,
key: box.Gate.AccessKey + cid.String(),
delimiter: delimiter,
prefix: prefix,

View file

@ -8,26 +8,30 @@ import (
"strings"
"time"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
)
type (
// ObjectInfo holds S3 object data.
ObjectInfo struct {
id *object.ID
isDir bool
id *object.ID
bucketID *cid.ID
isDir bool
Bucket string
Name string
Size int64
ContentType string
Created time.Time
HashSum string
Owner *owner.ID
Headers map[string]string
Bucket string
Name string
Size int64
ContentType string
Created time.Time
CreationEpoch uint64
HashSum string
Owner *owner.ID
Headers map[string]string
}
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
@ -51,29 +55,19 @@ type (
// ObjectVersionInfo stores info about objects versions.
ObjectVersionInfo struct {
Object *ObjectInfo
IsLatest bool
VersionID string
}
// DeletedObjectInfo stores info about deleted versions of objects.
DeletedObjectInfo struct {
Owner *owner.ID
Key string
VersionID string
IsLatest bool
LastModified string
Object *ObjectInfo
IsLatest bool
}
// ListObjectVersionsInfo stores info and list of objects' versions.
ListObjectVersionsInfo struct {
CommonPrefixes []*string
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*ObjectVersionInfo
DeleteMarker []*DeletedObjectInfo
DeleteMarker []*ObjectVersionInfo
VersionIDMarker string
}
)
@ -91,7 +85,11 @@ func userHeaders(attrs []*object.Attribute) map[string]string {
return result
}
func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo {
func objInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object) *ObjectInfo {
return objectInfoFromMeta(bkt, meta, "", "")
}
func objectInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo {
var (
isDir bool
size int64
@ -133,28 +131,22 @@ func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter
}
return &ObjectInfo{
id: meta.ID(),
isDir: isDir,
id: meta.ID(),
bucketID: bkt.CID,
isDir: isDir,
Bucket: bkt.Name,
Name: filename,
Created: creation,
ContentType: mimeType,
Headers: userHeaders,
Owner: meta.OwnerID(),
Size: size,
HashSum: meta.PayloadChecksum().String(),
Bucket: bkt.Name,
Name: filename,
Created: creation,
CreationEpoch: meta.CreationEpoch(),
ContentType: mimeType,
Headers: userHeaders,
Owner: meta.OwnerID(),
Size: size,
HashSum: meta.PayloadChecksum().String(),
}
}
func objectVersionInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectVersionInfo {
oi := objectInfoFromMeta(bkt, meta, prefix, delimiter)
if oi == nil {
return nil
}
return &ObjectVersionInfo{Object: oi, IsLatest: true, VersionID: unversionedObjectVersionID}
}
func filenameFromObject(o *object.Object) string {
var name = o.ID().String()
for _, attr := range o.Attributes() {
@ -174,6 +166,18 @@ func NameFromString(name string) (string, string) {
// ID returns object ID from ObjectInfo.
func (o *ObjectInfo) ID() *object.ID { return o.id }
// Version returns object version from ObjectInfo.
func (o *ObjectInfo) Version() string { return o.id.String() }
// NiceName returns object name for cache.
func (o *ObjectInfo) NiceName() string { return o.Bucket + "/" + o.Name }
// Address returns object address.
func (o *ObjectInfo) Address() *object.Address { return newAddress(o.bucketID, o.id) }
// CID returns bucket ID from ObjectInfo.
func (o *ObjectInfo) CID() *cid.ID { return o.bucketID }
// IsDir allows to check if object is a directory.
func (o *ObjectInfo) IsDir() bool { return o.isDir }

View file

@ -9,6 +9,7 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/stretchr/testify/require"
)
@ -19,7 +20,7 @@ var (
defaultTestContentType = http.DetectContentType(defaultTestPayload)
)
func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object {
func newTestObject(oid *object.ID, bkt *cache.BucketInfo, name string) *object.Object {
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(name)
@ -43,11 +44,12 @@ func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object
return raw.Object()
}
func newTestInfo(oid *object.ID, bkt *BucketInfo, name string, isDir bool) *ObjectInfo {
func newTestInfo(oid *object.ID, bkt *cache.BucketInfo, name string, isDir bool) *ObjectInfo {
info := &ObjectInfo{
id: oid,
Name: name,
Bucket: bkt.Name,
bucketID: bkt.CID,
Size: defaultTestPayloadLength,
ContentType: defaultTestContentType,
Created: time.Unix(defaultTestCreated.Unix(), 0),
@ -70,7 +72,7 @@ func Test_objectInfoFromMeta(t *testing.T) {
oid := object.NewID()
containerID := cid.New()
bkt := &BucketInfo{
bkt := &cache.BucketInfo{
Name: "test-container",
CID: containerID,
Owner: uid,

325
api/layer/versioning.go Normal file
View file

@ -0,0 +1,325 @@
package layer
import (
"context"
"sort"
"strconv"
"strings"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"go.uber.org/zap"
)
type objectVersions struct {
name string
objects []*ObjectInfo
addList []string
delList []string
isSorted bool
}
const (
unversionedObjectVersionID = "null"
objectSystemAttributeName = "S3-System-name"
attrVersionsIgnore = "S3-Versions-ignore"
attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled"
versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add"
versionsDeleteMarkAttr = "S3-Versions-delete-mark"
delMarkFullObject = "*"
)
func newObjectVersions(name string) *objectVersions {
return &objectVersions{name: name}
}
func (v *objectVersions) appendVersion(oi *ObjectInfo) {
addVers := append(splitVersions(oi.Headers[versionsAddAttr]), oi.Version())
delVers := splitVersions(oi.Headers[versionsDelAttr])
v.objects = append(v.objects, oi)
for _, add := range addVers {
if !contains(v.addList, add) {
v.addList = append(v.addList, add)
}
}
for _, del := range delVers {
if !contains(v.delList, del) {
v.delList = append(v.delList, del)
}
}
v.isSorted = false
}
func (v *objectVersions) sort() {
if !v.isSorted {
sort.Slice(v.objects, func(i, j int) bool {
return less(v.objects[i], v.objects[j])
})
v.isSorted = true
}
}
func (v *objectVersions) getLast() *ObjectInfo {
if len(v.objects) == 0 {
return nil
}
v.sort()
existedVersions := getExistedVersions(v)
for i := len(v.objects) - 1; i >= 0; i-- {
if contains(existedVersions, v.objects[i].Version()) {
delMarkHeader := v.objects[i].Headers[versionsDeleteMarkAttr]
if delMarkHeader == "" {
return v.objects[i]
}
if delMarkHeader == delMarkFullObject {
return nil
}
}
}
return nil
}
func (v *objectVersions) getFiltered() []*ObjectInfo {
if len(v.objects) == 0 {
return nil
}
v.sort()
existedVersions := getExistedVersions(v)
res := make([]*ObjectInfo, 0, len(v.objects))
for _, version := range v.objects {
delMark := version.Headers[versionsDeleteMarkAttr]
if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") {
res = append(res, version)
}
}
return res
}
func (v *objectVersions) getAddHeader() string {
return strings.Join(v.addList, ",")
}
func (v *objectVersions) getDelHeader() string {
return strings.Join(v.delList, ",")
}
func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) {
bucketInfo, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil {
return nil, err
}
objectInfo, err := n.getSettingsObjectInfo(ctx, bucketInfo)
if err != nil {
n.log.Warn("couldn't get bucket version settings object, new one will be created",
zap.String("bucket_name", bucketInfo.Name),
zap.Stringer("cid", bucketInfo.CID),
zap.Error(err))
}
attributes := make([]*object.Attribute, 0, 3)
filename := object.NewAttribute()
filename.SetKey(objectSystemAttributeName)
filename.SetValue(bucketInfo.SettingsObjectName())
createdAt := object.NewAttribute()
createdAt.SetKey(object.AttributeTimestamp)
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
versioningIgnore := object.NewAttribute()
versioningIgnore.SetKey(attrVersionsIgnore)
versioningIgnore.SetValue(strconv.FormatBool(true))
settingsVersioningEnabled := object.NewAttribute()
settingsVersioningEnabled.SetKey(attrSettingsVersioningEnabled)
settingsVersioningEnabled.SetValue(strconv.FormatBool(p.Settings.VersioningEnabled))
attributes = append(attributes, filename, createdAt, versioningIgnore, settingsVersioningEnabled)
raw := object.NewRaw()
raw.SetOwnerID(bucketInfo.Owner)
raw.SetContainerID(bucketInfo.CID)
raw.SetAttributes(attributes...)
ops := new(client.PutObjectParams).WithObject(raw.Object())
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
if err != nil {
return nil, err
}
meta, err := n.objectHead(ctx, bucketInfo.CID, oid)
if err != nil {
return nil, err
}
if err = n.systemCache.Put(bucketInfo.SettingsObjectKey(), meta); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
if objectInfo != nil {
if err = n.objectDelete(ctx, bucketInfo.CID, objectInfo.ID()); err != nil {
return nil, err
}
}
return objectInfoFromMeta(bucketInfo, meta, "", ""), nil
}
func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) {
bktInfo, err := n.GetBucketInfo(ctx, bucketName)
if err != nil {
return nil, err
}
return n.getBucketSettings(ctx, bktInfo)
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var versions map[string]*objectVersions
res := &ListObjectVersionsInfo{}
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil {
return nil, err
}
cacheKey, err := createKey(ctx, bkt.CID, listVersionsMethod, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
allObjects := n.listsCache.Get(cacheKey)
if allObjects == nil {
versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
allObjects = make([]*ObjectInfo, 0, p.MaxKeys)
for _, name := range sortedNames {
allObjects = append(allObjects, versions[name].getFiltered()...)
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
}
for i, obj := range allObjects {
if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker {
allObjects = allObjects[i:]
break
}
}
res.CommonPrefixes, allObjects = triageObjects(allObjects)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys].Name
res.NextVersionIDMarker = allObjects[p.MaxKeys].Version()
allObjects = allObjects[:p.MaxKeys]
res.KeyMarker = allObjects[p.MaxKeys-1].Name
res.VersionIDMarker = allObjects[p.MaxKeys-1].Version()
}
objects := make([]*ObjectVersionInfo, len(allObjects))
for i, obj := range allObjects {
objects[i] = &ObjectVersionInfo{Object: obj}
if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name {
objects[i].IsLatest = true
}
}
res.Version, res.DeleteMarker = triageVersions(objects)
return res, nil
}
func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*ObjectVersionInfo) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*ObjectVersionInfo
var resDelMarkVersions []*ObjectVersionInfo
for _, version := range objVersions {
if version.Object.Headers[versionsDeleteMarkAttr] == delMarkFullObject {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}
func less(ov1, ov2 *ObjectInfo) bool {
if ov1.CreationEpoch == ov2.CreationEpoch {
return ov1.Version() < ov2.Version()
}
return ov1.CreationEpoch < ov2.CreationEpoch
}
func contains(list []string, elem string) bool {
for _, item := range list {
if elem == item {
return true
}
}
return false
}
func (n *layer) getBucketSettings(ctx context.Context, bktInfo *cache.BucketInfo) (*BucketSettings, error) {
objInfo, err := n.getSettingsObjectInfo(ctx, bktInfo)
if err != nil {
return nil, err
}
return objectInfoToBucketSettings(objInfo), nil
}
func objectInfoToBucketSettings(info *ObjectInfo) *BucketSettings {
res := &BucketSettings{}
enabled, ok := info.Headers[attrSettingsVersioningEnabled]
if ok {
if parsed, err := strconv.ParseBool(enabled); err == nil {
res.VersioningEnabled = parsed
}
}
return res
}
func (n *layer) checkVersionsExist(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) (*object.ID, error) {
id := object.NewID()
if err := id.Parse(obj.VersionID); err != nil {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
}
versions, err := n.headVersions(ctx, bkt, obj.Name)
if err != nil {
return nil, err
}
if !contains(getExistedVersions(versions), obj.VersionID) {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
}
return id, nil
}

View file

@ -0,0 +1,610 @@
package layer
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"fmt"
"io"
"strings"
"testing"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/pkg/session"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"github.com/nspcc-dev/neofs-sdk-go/pkg/logger"
"github.com/nspcc-dev/neofs-sdk-go/pkg/pool"
"github.com/stretchr/testify/require"
)
type testPool struct {
objects map[string]*object.Object
containers map[string]*container.Container
currentEpoch uint64
}
func newTestPool() *testPool {
return &testPool{
objects: make(map[string]*object.Object),
containers: make(map[string]*container.Container),
}
}
func (t *testPool) PutObject(ctx context.Context, params *client.PutObjectParams, option ...client.CallOption) (*object.ID, error) {
b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return nil, err
}
oid := object.NewID()
oid.SetSHA256(sha256.Sum256(b))
raw := object.NewRawFrom(params.Object())
raw.SetID(oid)
raw.SetCreationEpoch(t.currentEpoch)
t.currentEpoch++
if params.PayloadReader() != nil {
all, err := io.ReadAll(params.PayloadReader())
if err != nil {
return nil, err
}
raw.SetPayload(all)
}
addr := newAddress(raw.ContainerID(), raw.ID())
t.objects[addr.String()] = raw.Object()
return raw.ID(), nil
}
func (t *testPool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, option ...client.CallOption) error {
delete(t.objects, params.Address().String())
return nil
}
func (t *testPool) GetObject(ctx context.Context, params *client.GetObjectParams, option ...client.CallOption) (*object.Object, error) {
if obj, ok := t.objects[params.Address().String()]; ok {
if params.PayloadWriter() != nil {
_, err := params.PayloadWriter().Write(obj.Payload())
if err != nil {
return nil, err
}
}
return obj, nil
}
return nil, fmt.Errorf("object not found " + params.Address().String())
}
func (t *testPool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, option ...client.CallOption) (*object.Object, error) {
p := new(client.GetObjectParams).WithAddress(params.Address())
return t.GetObject(ctx, p)
}
func (t *testPool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, option ...client.CallOption) ([]byte, error) {
panic("implement me")
}
func (t *testPool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, option ...client.CallOption) ([][32]byte, error) {
panic("implement me")
}
func (t *testPool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, option ...client.CallOption) ([][64]byte, error) {
panic("implement me")
}
func (t *testPool) SearchObject(ctx context.Context, params *client.SearchObjectParams, option ...client.CallOption) ([]*object.ID, error) {
cidStr := params.ContainerID().String()
var res []*object.ID
if len(params.SearchFilters()) == 1 {
for k, v := range t.objects {
if strings.Contains(k, cidStr) {
res = append(res, v.ID())
}
}
return res, nil
}
filter := params.SearchFilters()[1]
if len(params.SearchFilters()) != 2 || filter.Operation() != object.MatchStringEqual ||
(filter.Header() != object.AttributeFileName && filter.Header() != objectSystemAttributeName) {
return nil, fmt.Errorf("usupported filters")
}
for k, v := range t.objects {
if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) {
res = append(res, v.ID())
}
}
return res, nil
}
func isMatched(attributes []*object.Attribute, filter object.SearchFilter) bool {
for _, attr := range attributes {
if attr.Key() == filter.Header() && attr.Value() == filter.Value() {
return true
}
}
return false
}
func (t *testPool) PutContainer(ctx context.Context, container *container.Container, option ...client.CallOption) (*cid.ID, error) {
b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return nil, err
}
id := cid.New()
id.SetSHA256(sha256.Sum256(b))
t.containers[id.String()] = container
return id, nil
}
func (t *testPool) GetContainer(ctx context.Context, id *cid.ID, option ...client.CallOption) (*container.Container, error) {
for k, v := range t.containers {
if k == id.String() {
return v, nil
}
}
return nil, fmt.Errorf("container not found " + id.String())
}
func (t *testPool) ListContainers(ctx context.Context, id *owner.ID, option ...client.CallOption) ([]*cid.ID, error) {
var res []*cid.ID
for k := range t.containers {
cID := cid.New()
if err := cID.Parse(k); err != nil {
return nil, err
}
res = append(res, cID)
}
return res, nil
}
func (t *testPool) DeleteContainer(ctx context.Context, id *cid.ID, option ...client.CallOption) error {
delete(t.containers, id.String())
return nil
}
func (t *testPool) GetEACL(ctx context.Context, id *cid.ID, option ...client.CallOption) (*client.EACLWithSignature, error) {
panic("implement me")
}
func (t *testPool) SetEACL(ctx context.Context, table *eacl.Table, option ...client.CallOption) error {
panic("implement me")
}
func (t *testPool) AnnounceContainerUsedSpace(ctx context.Context, announcements []container.UsedSpaceAnnouncement, option ...client.CallOption) error {
panic("implement me")
}
func (t *testPool) Connection() (client.Client, *session.Token, error) {
panic("implement me")
}
func (t *testPool) OwnerID() *owner.ID {
return nil
}
func (t *testPool) WaitForContainerPresence(ctx context.Context, id *cid.ID, params *pool.ContainerPollingParams) error {
return nil
}
func (tc *testContext) putObject(content []byte) *ObjectInfo {
objInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
Bucket: tc.bkt,
Object: tc.obj,
Size: int64(len(content)),
Reader: bytes.NewReader(content),
Header: make(map[string]string),
})
require.NoError(tc.t, err)
return objInfo
}
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*ObjectInfo, []byte) {
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{
Bucket: tc.bkt,
Object: objectName,
VersionID: versionID,
})
if needError {
require.Error(tc.t, err)
return nil, nil
}
require.NoError(tc.t, err)
content := bytes.NewBuffer(nil)
err = tc.layer.GetObject(tc.ctx, &GetObjectParams{
ObjectInfo: objInfo,
Writer: content,
VersionID: versionID,
})
require.NoError(tc.t, err)
return objInfo, content.Bytes()
}
func (tc *testContext) deleteObject(objectName, versionID string) {
errs := tc.layer.DeleteObjects(tc.ctx, tc.bkt, []*VersionedObject{
{Name: objectName, VersionID: versionID},
})
for _, err := range errs {
require.NoError(tc.t, err)
}
}
func (tc *testContext) listObjectsV1() []*ObjectInfo {
res, err := tc.layer.ListObjectsV1(tc.ctx, &ListObjectsParamsV1{
ListObjectsParamsCommon: ListObjectsParamsCommon{
Bucket: tc.bkt,
MaxKeys: 1000,
},
})
require.NoError(tc.t, err)
return res.Objects
}
func (tc *testContext) listObjectsV2() []*ObjectInfo {
res, err := tc.layer.ListObjectsV2(tc.ctx, &ListObjectsParamsV2{
ListObjectsParamsCommon: ListObjectsParamsCommon{
Bucket: tc.bkt,
MaxKeys: 1000,
},
})
require.NoError(tc.t, err)
return res.Objects
}
func (tc *testContext) listVersions() *ListObjectVersionsInfo {
res, err := tc.layer.ListObjectVersions(tc.ctx, &ListObjectVersionsParams{
Bucket: tc.bkt,
MaxKeys: 1000,
})
require.NoError(tc.t, err)
return res
}
func (tc *testContext) checkListObjects(ids ...*object.ID) {
objs := tc.listObjectsV1()
require.Equal(tc.t, len(ids), len(objs))
for _, id := range ids {
require.Contains(tc.t, ids, id)
}
objs = tc.listObjectsV2()
require.Equal(tc.t, len(ids), len(objs))
for _, id := range ids {
require.Contains(tc.t, ids, id)
}
}
type testContext struct {
t *testing.T
ctx context.Context
layer Client
bkt string
bktID *cid.ID
obj string
testPool *testPool
}
func prepareContext(t *testing.T) *testContext {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
ctx := context.WithValue(context.Background(), api.BoxData, &accessbox.Box{
Gate: &accessbox.GateData{
BearerToken: token.NewBearerToken(),
GateKey: key.PublicKey(),
},
})
l, err := logger.New(logger.WithTraceLevel("panic"))
require.NoError(t, err)
tp := newTestPool()
bktName := "testbucket1"
cnr := container.New(container.WithAttribute(container.AttributeName, bktName))
bktID, err := tp.PutContainer(ctx, cnr)
require.NoError(t, err)
return &testContext{
ctx: ctx,
layer: NewLayer(l, tp, &CacheConfig{
Size: cache.DefaultObjectsCacheSize,
Lifetime: cache.DefaultObjectsCacheLifetime,
ListObjectsLifetime: DefaultObjectsListCacheLifetime},
),
bkt: bktName,
bktID: bktID,
obj: "obj1",
t: t,
testPool: tp,
}
}
func TestSimpleVersioning(t *testing.T) {
tc := prepareContext(t)
_, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
Bucket: tc.bkt,
Settings: &BucketSettings{VersioningEnabled: true},
})
require.NoError(t, err)
obj1Content1 := []byte("content obj1 v1")
obj1v1 := tc.putObject(obj1Content1)
obj1Content2 := []byte("content obj1 v2")
obj1v2 := tc.putObject(obj1Content2)
objv2, buffer2 := tc.getObject(tc.obj, "", false)
require.Equal(t, obj1Content2, buffer2)
require.Contains(t, objv2.Headers[versionsAddAttr], obj1v1.ID().String())
_, buffer1 := tc.getObject(tc.obj, obj1v1.ID().String(), false)
require.Equal(t, obj1Content1, buffer1)
tc.checkListObjects(obj1v2.ID())
}
func TestSimpleNoVersioning(t *testing.T) {
tc := prepareContext(t)
obj1Content1 := []byte("content obj1 v1")
obj1v1 := tc.putObject(obj1Content1)
obj1Content2 := []byte("content obj1 v2")
obj1v2 := tc.putObject(obj1Content2)
objv2, buffer2 := tc.getObject(tc.obj, "", false)
require.Equal(t, obj1Content2, buffer2)
require.Contains(t, objv2.Headers[versionsDelAttr], obj1v1.ID().String())
tc.getObject(tc.obj, obj1v1.ID().String(), true)
tc.checkListObjects(obj1v2.ID())
}
func TestVersioningDeleteObject(t *testing.T) {
tc := prepareContext(t)
_, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
Bucket: tc.bkt,
Settings: &BucketSettings{VersioningEnabled: true},
})
require.NoError(t, err)
tc.putObject([]byte("content obj1 v1"))
tc.putObject([]byte("content obj1 v2"))
tc.deleteObject(tc.obj, "")
tc.getObject(tc.obj, "", true)
tc.checkListObjects()
}
func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
tc := prepareContext(t)
_, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
Bucket: tc.bkt,
Settings: &BucketSettings{VersioningEnabled: true},
})
require.NoError(t, err)
tc.putObject([]byte("content obj1 v1"))
objV2Info := tc.putObject([]byte("content obj1 v2"))
objV3Content := []byte("content obj1 v3")
objV3Info := tc.putObject(objV3Content)
tc.deleteObject(tc.obj, objV2Info.Version())
tc.getObject(tc.obj, objV2Info.Version(), true)
_, buffer3 := tc.getObject(tc.obj, "", false)
require.Equal(t, objV3Content, buffer3)
tc.deleteObject(tc.obj, "")
tc.getObject(tc.obj, "", true)
for _, ver := range tc.listVersions().DeleteMarker {
if ver.IsLatest {
tc.deleteObject(tc.obj, ver.Object.Version())
}
}
resInfo, buffer := tc.getObject(tc.obj, "", false)
require.Equal(t, objV3Content, buffer)
require.Equal(t, objV3Info.Version(), resInfo.Version())
}
func TestNoVersioningDeleteObject(t *testing.T) {
tc := prepareContext(t)
tc.putObject([]byte("content obj1 v1"))
tc.putObject([]byte("content obj1 v2"))
tc.deleteObject(tc.obj, "")
tc.getObject(tc.obj, "", true)
tc.checkListObjects()
}
func TestGetLastVersion(t *testing.T) {
obj1 := getTestObjectInfo(1, getOID(1), "", "", "")
obj1V2 := getTestObjectInfo(1, getOID(2), "", "", "")
obj2 := getTestObjectInfo(2, getOID(2), obj1.Version(), "", "")
obj3 := getTestObjectInfo(3, getOID(3), joinVers(obj1, obj2), "", "*")
obj4 := getTestObjectInfo(4, getOID(4), joinVers(obj1, obj2), obj2.Version(), obj2.Version())
obj5 := getTestObjectInfo(5, getOID(5), obj1.Version(), obj1.Version(), obj1.Version())
obj6 := getTestObjectInfo(6, getOID(6), joinVers(obj1, obj2, obj3), obj3.Version(), obj3.Version())
for _, tc := range []struct {
versions *objectVersions
expected *ObjectInfo
}{
{
versions: &objectVersions{},
expected: nil,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj2, obj1},
addList: []string{obj1.Version(), obj2.Version()},
},
expected: obj2,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj2, obj1, obj3},
addList: []string{obj1.Version(), obj2.Version(), obj3.Version()},
},
expected: nil,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj2, obj1, obj4},
addList: []string{obj1.Version(), obj2.Version(), obj4.Version()},
delList: []string{obj2.Version()},
},
expected: obj1,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj1, obj5},
addList: []string{obj1.Version(), obj5.Version()},
delList: []string{obj1.Version()},
},
expected: nil,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj5},
},
expected: nil,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj1, obj2, obj3, obj6},
addList: []string{obj1.Version(), obj2.Version(), obj3.Version(), obj6.Version()},
delList: []string{obj3.Version()},
},
expected: obj2,
},
{
versions: &objectVersions{
objects: []*ObjectInfo{obj1, obj1V2},
addList: []string{obj1.Version(), obj1V2.Version()},
},
// creation epochs are equal
// obj1 version/oid > obj1_1 version/oid
expected: obj1,
},
} {
actualObjInfo := tc.versions.getLast()
require.Equal(t, tc.expected, actualObjInfo)
}
}
func TestAppendVersions(t *testing.T) {
obj1 := getTestObjectInfo(1, getOID(1), "", "", "")
obj2 := getTestObjectInfo(2, getOID(2), obj1.Version(), "", "")
obj3 := getTestObjectInfo(3, getOID(3), joinVers(obj1, obj2), "", "*")
obj4 := getTestObjectInfo(4, getOID(4), joinVers(obj1, obj2), obj2.Version(), obj2.Version())
for _, tc := range []struct {
versions *objectVersions
objectToAdd *ObjectInfo
expectedVersions *objectVersions
}{
{
versions: &objectVersions{},
objectToAdd: obj1,
expectedVersions: &objectVersions{
objects: []*ObjectInfo{obj1},
addList: []string{obj1.Version()},
},
},
{
versions: &objectVersions{objects: []*ObjectInfo{obj1}},
objectToAdd: obj2,
expectedVersions: &objectVersions{
objects: []*ObjectInfo{obj1, obj2},
addList: []string{obj1.Version(), obj2.Version()},
},
},
{
versions: &objectVersions{objects: []*ObjectInfo{obj1, obj2}},
objectToAdd: obj3,
expectedVersions: &objectVersions{
objects: []*ObjectInfo{obj1, obj2, obj3},
addList: []string{obj1.Version(), obj2.Version(), obj3.Version()},
},
},
{
versions: &objectVersions{objects: []*ObjectInfo{obj1, obj2}},
objectToAdd: obj4,
expectedVersions: &objectVersions{
objects: []*ObjectInfo{obj1, obj2, obj4},
addList: []string{obj1.Version(), obj2.Version(), obj4.Version()},
delList: []string{obj2.Version()},
},
},
} {
tc.versions.appendVersion(tc.objectToAdd)
require.Equal(t, tc.expectedVersions, tc.versions)
}
}
func joinVers(objs ...*ObjectInfo) string {
if len(objs) == 0 {
return ""
}
var versions []string
for _, obj := range objs {
versions = append(versions, obj.Version())
}
return strings.Join(versions, ",")
}
func getOID(id byte) *object.ID {
b := make([]byte, 32)
b[0] = id
oid := object.NewID()
oid.SetSHA256(sha256.Sum256(b))
return oid
}
func getTestObjectInfo(epoch uint64, oid *object.ID, addAttr, delAttr, delMarkAttr string) *ObjectInfo {
headers := make(map[string]string)
if addAttr != "" {
headers[versionsAddAttr] = addAttr
}
if delAttr != "" {
headers[versionsDelAttr] = delAttr
}
if delMarkAttr != "" {
headers[versionsDeleteMarkAttr] = delMarkAttr
}
return &ObjectInfo{
id: oid,
CreationEpoch: epoch,
Headers: headers,
}
}

View file

@ -226,8 +226,8 @@ See also `GetObject` and other method parameters.
| | Method | Comments |
|----|---------------------|----------|
| 🔴 | GetBucketVersioning | |
| 🔴 | PutBucketVersioning | |
| 🟢 | GetBucketVersioning | |
| 🟢 | PutBucketVersioning | |
## Website