[#122] Add delete versioned object

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-08-10 15:08:15 +03:00
parent 3130784ee6
commit d5aef7566f
7 changed files with 189 additions and 86 deletions

View file

@ -64,8 +64,9 @@ type (
)
const (
versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add"
versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add"
versionsDeleteMarkAttr = "S3-Versions-delete-mark"
)
// objectSearch returns all available objects by search params.
@ -121,11 +122,10 @@ func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) {
}
// objectPut into NeoFS, took payload from io.Reader.
func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) {
func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectParams) (*ObjectInfo, error) {
var (
err error
obj string
bkt *BucketInfo
own = n.Owner(ctx)
)
@ -136,9 +136,6 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
if obj, err = url.QueryUnescape(p.Object); err != nil {
return nil, err
}
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return nil, err
}
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
lastVersionInfo, err := n.headLastVersion(ctx, bkt, p.Object)
@ -155,15 +152,18 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
versionsAddedStr += ","
}
versionsAddedStr += lastVersionInfo.ID().String()
addedVersions := object.NewAttribute()
addedVersions.SetKey(versionsAddAttr)
addedVersions.SetValue(versionsAddedStr)
attributes = append(attributes, addedVersions)
if delVersions := lastVersionInfo.Headers[versionsDelAttr]; len(delVersions) > 0 {
deletedVersions := object.NewAttribute()
deletedVersions.SetKey(versionsDelAttr)
deletedVersions.SetValue(delVersions)
attributes = append(attributes, deletedVersions)
p.Header[versionsAddAttr] = versionsAddedStr
deleted := p.Header[versionsDelAttr]
if delVersions := lastVersionInfo.Headers[versionsDelAttr]; len(delVersions) != 0 {
if len(deleted) == 0 {
deleted = delVersions
} else {
deleted = delVersions + "," + deleted
}
}
if len(deleted) != 0 {
p.Header[versionsDelAttr] = deleted
}
} else {
versionsDeletedStr := lastVersionInfo.Headers[versionsDelAttr]
@ -171,24 +171,19 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
versionsDeletedStr += ","
}
versionsDeletedStr += lastVersionInfo.ID().String()
deletedVersions := object.NewAttribute()
deletedVersions.SetKey(versionsDelAttr)
deletedVersions.SetValue(versionsDeletedStr)
p.Header[versionsDelAttr] = versionsDeletedStr
attributes = append(attributes, deletedVersions)
idsToDeleteArr = append(idsToDeleteArr, lastVersionInfo.ID())
}
}
unix := strconv.FormatInt(time.Now().UTC().Unix(), 10)
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(obj)
createdAt := object.NewAttribute()
createdAt.SetKey(object.AttributeTimestamp)
createdAt.SetValue(unix)
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
attributes = append(attributes, filename, createdAt)
@ -240,11 +235,7 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
}
for _, id := range idsToDeleteArr {
addr := object.NewAddress()
addr.SetObjectID(id)
addr.SetContainerID(bkt.CID)
if err = n.objectDelete(ctx, addr); err != nil {
if err = n.objectDelete(ctx, bkt.CID, id); err != nil {
n.log.Warn("couldn't delete object",
zap.Stringer("version id", id),
zap.Error(err))
@ -284,33 +275,28 @@ func (n *layer) headLastVersion(ctx context.Context, bkt *BucketInfo, objectName
return objectInfoFromMeta(bkt, infos[len(infos)-1], "", ""), nil
}
func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, objectName, versionID string) (*ObjectInfo, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName})
if err != nil {
func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID string) (*ObjectInfo, error) {
oid := object.NewID()
if err := oid.Parse(versionID); err != nil {
return nil, err
}
if len(ids) == 0 {
return nil, api.GetAPIError(api.ErrNoSuchVersion)
}
for _, id := range ids {
if id.String() == versionID {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, api.GetAPIError(api.ErrNoSuchVersion)
}
return nil, err
}
return objectInfoFromMeta(bkt, meta, "", ""), nil
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, api.GetAPIError(api.ErrNoSuchVersion)
}
return nil, err
}
return nil, api.GetAPIError(api.ErrNoSuchVersion)
return objectInfoFromMeta(bkt, meta, "", ""), nil
}
// objectDelete puts tombstone object into neofs.
func (n *layer) objectDelete(ctx context.Context, address *object.Address) error {
func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error {
address := object.NewAddress()
address.SetContainerID(cid)
address.SetObjectID(oid)
dop := new(client.DeleteObjectParams)
dop.WithAddress(address)
n.objCache.Delete(address)