[#122] Add list object versions

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-08-11 13:02:13 +03:00
parent d5aef7566f
commit 43185de52a
5 changed files with 127 additions and 56 deletions

View file

@ -5,6 +5,7 @@ import (
"net/http"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap"
)
@ -14,7 +15,7 @@ func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
configuration := new(VersioningConfiguration)
if err := xml.NewDecoder(r.Body).Decode(configuration); err != nil {
h.logAndSendError(w, "couldn't decode versioning configuration", reqInfo, api.GetAPIError(api.ErrIllegalVersioningConfigurationException))
h.logAndSendError(w, "couldn't decode versioning configuration", reqInfo, errors.GetAPIError(errors.ErrIllegalVersioningConfigurationException))
return
}

View file

@ -322,15 +322,15 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
return nil
}
func (n *layer) checkObject(ctx context.Context, cid *cid.ID, filename string) error {
var err error
if _, err = n.objectFindID(ctx, &findParams{cid: cid, val: filename}); err == nil {
return new(errors.ObjectAlreadyExists)
}
return err
}
//func (n *layer) checkObject(ctx context.Context, cid *cid.ID, filename string) error {
// var err error
//
// if _, err = n.objectFindID(ctx, &findParams{cid: cid, val: filename}); err == nil {
// return new(errors.ObjectAlreadyExists)
// }
//
// return err
//}
// GetObjectInfo returns meta information about the object.
func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error) {
@ -344,7 +344,7 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*Object
objInfo, err := n.headLastVersion(ctx, bkt, p.Object)
if err == nil {
if deleteMark, err2 := strconv.ParseBool(objInfo.Headers[versionsDeleteMarkAttr]); err2 == nil && deleteMark {
return nil, api.GetAPIError(api.ErrNoSuchKey)
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
}
}
return objInfo, err
@ -423,7 +423,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione
)
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
if !versioningEnabled && obj.VersionID != "null" && obj.VersionID != "" {
if !versioningEnabled && obj.VersionID != unversionedObjectVersionID && obj.VersionID != "" {
return errors.GetAPIError(errors.ErrInvalidVersion)
}
@ -431,17 +431,17 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione
if len(obj.VersionID) != 0 {
id := object.NewID()
if err := id.Parse(obj.VersionID); err != nil {
return &errors.DeleteError{Err: api.GetAPIError(api.ErrInvalidVersion), Object: obj.String()}
return &errors.DeleteError{Err: errors.GetAPIError(errors.ErrInvalidVersion), Object: obj.String()}
}
ids = []*object.ID{id}
lastObject, err := n.headLastVersion(ctx, bkt, obj.Name)
if err != nil {
return &api.DeleteError{Err: err, Object: obj.String()}
return &errors.DeleteError{Err: err, Object: obj.String()}
}
if !strings.Contains(lastObject.Headers[versionsAddAttr], obj.VersionID) ||
strings.Contains(lastObject.Headers[versionsDelAttr], obj.VersionID) {
return &api.DeleteError{Err: api.GetAPIError(api.ErrInvalidVersion), Object: obj.String()}
return &errors.DeleteError{Err: errors.GetAPIError(errors.ErrInvalidVersion), Object: obj.String()}
}
if lastObject.ID().String() == obj.VersionID {
@ -449,7 +449,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione
addedVersions := strings.Split(added, ",")
sourceCopyVersion, err := n.headVersion(ctx, bkt, addedVersions[len(addedVersions)-1])
if err != nil {
return &api.DeleteError{Err: err, Object: obj.String()}
return &errors.DeleteError{Err: err, Object: obj.String()}
}
p := &CopyObjectParams{
SrcObject: sourceCopyVersion,
@ -471,7 +471,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione
},
}
if _, err := n.objectPut(ctx, bkt, p); err != nil {
return &api.DeleteError{Err: err, Object: obj.String()}
return &errors.DeleteError{Err: err, Object: obj.String()}
}
}
} else {
@ -553,7 +553,7 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
return err
}
if len(ids) != 0 {
return api.GetAPIError(api.ErrBucketNotEmpty)
return errors.GetAPIError(errors.ErrBucketNotEmpty)
}
return n.deleteContainer(ctx, bucketInfo.CID)
@ -565,7 +565,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
err error
bkt *BucketInfo
ids []*object.ID
uniqNames = make(map[string]bool)
latest = make(map[string]*ObjectVersionInfo)
)
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
@ -575,9 +575,9 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
}
versions := make([]*ObjectVersionInfo, 0, len(ids))
// todo: deletemarkers is empty now, we will use it after proper realization of versioning
deleted := make([]*DeletedObjectInfo, 0, len(ids))
res.DeleteMarker = deleted
deletedVersions := []string{}
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
@ -586,45 +586,114 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
continue
}
if ov := objectVersionInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); ov != nil {
if _, ok := uniqNames[ov.Object.Name]; ok {
if ov.Object.Name <= p.KeyMarker {
continue
}
if len(p.KeyMarker) > 0 && ov.Object.Name <= p.KeyMarker {
continue
if currentLatest, ok := latest[ov.Object.Name]; ok {
if less(currentLatest, ov) {
latest[ov.Object.Name] = ov
}
uniqNames[ov.Object.Name] = ov.Object.isDir
} else {
latest[ov.Object.Name] = ov
}
if del := ov.Object.Headers[versionsDelAttr]; len(del) != 0 {
deletedVersions = append(deletedVersions, strings.Split(del, ",")...)
}
if parsed, err := strconv.ParseBool(ov.Object.Headers[versionsDeleteMarkAttr]); err == nil && parsed {
deleted = append(deleted, &DeletedObjectInfo{
Owner: ov.Object.Owner,
Key: ov.Object.Name,
VersionID: ov.VersionID,
LastModified: ov.Object.Created.Format(time.RFC3339),
})
} else {
versions = append(versions, ov)
}
}
}
sort.Slice(versions, func(i, j int) bool {
if contains(deletedVersions, versions[i].VersionID) {
return true
}
if contains(deletedVersions, versions[j].VersionID) {
return false
}
if versions[i].Object.Name == versions[j].Object.Name {
if versions[i].CreationEpoch == versions[j].CreationEpoch {
return versions[i].VersionID < versions[j].VersionID
}
return versions[i].CreationEpoch < versions[j].CreationEpoch
}
return versions[i].Object.Name < versions[j].Object.Name
})
if len(versions) > p.MaxKeys {
res.IsTruncated = true
lastVersion := versions[p.MaxKeys-1]
res.KeyMarker = lastVersion.Object.Name
res.VersionIDMarker = lastVersion.VersionID
nextVersion := versions[p.MaxKeys]
res.NextKeyMarker = nextVersion.Object.Name
res.NextVersionIDMarker = nextVersion.VersionID
versions = versions[:p.MaxKeys]
}
for _, ov := range versions {
if isDir := uniqNames[ov.Object.Name]; isDir {
res.CommonPrefixes = append(res.CommonPrefixes, &ov.Object.Name)
} else {
res.Version = append(res.Version, ov)
for i, objVersion := range versions {
if i == len(versions)-1 || objVersion.Object.Name != versions[i+1].Object.Name {
objVersion.IsLatest = true
}
}
for i, objVersion := range versions {
if !contains(deletedVersions, objVersion.VersionID) {
versions = versions[i:]
break
}
}
for i, objVersion := range deleted {
if !contains(deletedVersions, objVersion.VersionID) {
deleted = deleted[i:]
break
}
}
//if len(versions) > p.MaxKeys {
// res.IsTruncated = true
//
// lastVersion := versions[p.MaxKeys-1]
// res.KeyMarker = lastVersion.Object.Name
// res.VersionIDMarker = lastVersion.VersionID
//
// nextVersion := versions[p.MaxKeys]
// res.NextKeyMarker = nextVersion.Object.Name
// res.NextVersionIDMarker = nextVersion.VersionID
//
// versions = versions[:p.MaxKeys]
//}
//
//for _, ov := range versions {
// if isDir := uniqNames[ov.Object.Name]; isDir {
// res.CommonPrefixes = append(res.CommonPrefixes, &ov.Object.Name)
// } else {
// res.Version = append(res.Version, ov)
// }
//}
res.Version = versions
res.DeleteMarker = deleted
return &res, nil
}
func less(ov1, ov2 *ObjectVersionInfo) bool {
if ov1.CreationEpoch == ov2.CreationEpoch {
return ov1.VersionID < ov2.VersionID
}
return ov1.CreationEpoch < ov2.CreationEpoch
}
func contains(list []string, elem string) bool {
for _, item := range list {
if elem == item {
return true
}
}
return false
}
func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) {
bucketInfo, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil {

View file

@ -252,7 +252,7 @@ func (n *layer) headLastVersion(ctx context.Context, bkt *BucketInfo, objectName
}
if len(ids) == 0 {
return nil, api.GetAPIError(api.ErrNoSuchKey)
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
infos := make([]*object.Object, 0, len(ids))
@ -284,7 +284,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID stri
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, api.GetAPIError(api.ErrNoSuchVersion)
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
return nil, err
}

View file

@ -57,6 +57,7 @@ type (
Object *ObjectInfo
IsLatest bool
VersionID string
CreationEpoch uint64
}
// DeletedObjectInfo stores info about deleted versions of objects.
@ -156,7 +157,7 @@ func objectVersionInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, del
if oi == nil {
return nil
}
return &ObjectVersionInfo{Object: oi, IsLatest: true, VersionID: unversionedObjectVersionID}
return &ObjectVersionInfo{Object: oi, VersionID: meta.ID().String(), CreationEpoch: meta.CreationEpoch()}
}
func filenameFromObject(o *object.Object) string {