[#413] Use tree service to delete object

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-18 09:51:12 +03:00 committed by Alex Vanin
parent 49bd77d9cf
commit 977f176713
5 changed files with 53 additions and 73 deletions

View file

@ -1,7 +1,6 @@
package layer
import (
"bytes"
"context"
"crypto/ecdsa"
"fmt"
@ -21,7 +20,6 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
@ -251,6 +249,8 @@ type (
const (
tagPrefix = "S3-Tag-"
tagEmptyMark = "\\"
emptyOID = "11111111111111111111111111111111"
)
func (t *VersionedObject) String() string {
@ -543,79 +543,55 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Obje
// DeleteObject removes all objects with the passed nice name.
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
var (
err error
ids []oid.ID
)
p := &PutObjectParams{
BktInfo: bkt,
Object: obj.Name,
Reader: bytes.NewReader(nil),
Header: map[string]string{},
if !isRegularVersion(obj.VersionID) {
newVersion := &NodeVersion{
IsDeleteMarker: true,
IsUnversioned: obj.VersionID == unversionedObjectVersionID,
}
// Current implementation doesn't consider "unversioned" mode (so any deletion creates "delete-mark" object).
// The reason is difficulties to determinate whether versioning mode is "unversioned" or "suspended".
if obj.VersionID == unversionedObjectVersionID || !settings.VersioningEnabled && len(obj.VersionID) == 0 {
p.Header[versionsUnversionedAttr] = "true"
versions, err := n.headVersions(ctx, bkt, obj.Name)
if err != nil {
obj.Error = err
if obj.Error = n.treeService.AddVersion(ctx, &bkt.CID, obj.Name, newVersion); obj.Error != nil {
return obj
}
last := versions.getLast(FromUnversioned())
if last == nil {
return obj
}
p.Header[VersionsDeleteMarkAttr] = last.Version()
for _, unversioned := range versions.unversioned() {
ids = append(ids, unversioned.ID)
}
} else if len(obj.VersionID) != 0 {
version, err := n.checkVersionsExist(ctx, bkt, obj)
if err != nil {
obj.Error = err
return obj
}
ids = []oid.ID{version.ID}
if version.Headers[VersionsDeleteMarkAttr] == DelMarkFullObject {
obj.DeleteMarkVersion = version.Version()
}
p.Header[versionsDelAttr] = obj.VersionID
p.Header[VersionsDeleteMarkAttr] = version.Version()
} else {
p.Header[VersionsDeleteMarkAttr] = DelMarkFullObject
if obj.VersionID == emptyOID {
obj.Error = errors.GetAPIError(errors.ErrInvalidVersion)
return obj
}
for _, id := range ids {
if err = n.objectDelete(ctx, bkt, id); err != nil {
versions, err := n.treeService.GetVersions(ctx, &bkt.CID, obj.Name)
if err != nil {
obj.Error = err
return obj
}
if err = n.DeleteObjectTagging(ctx, bkt, &data.ObjectInfo{ID: id, Bucket: bkt.Name, Name: obj.Name}); err != nil {
var found bool
for _, version := range versions {
if version.OID.EncodeToString() == obj.VersionID {
if err = n.treeService.RemoveVersion(ctx, &bkt.CID, version.ID); err == nil {
if err = n.objectDelete(ctx, bkt, version.OID); err == nil {
if err = n.DeleteObjectTagging(ctx, bkt, &data.ObjectInfo{ID: version.OID, Bucket: bkt.Name, Name: obj.Name}); err == nil {
found = true
break
}
}
}
obj.Error = err
return obj
}
}
if !found {
obj.Error = errors.GetAPIError(errors.ErrNoSuchVersion)
return obj
}
}
n.listsCache.CleanCacheEntriesContainingObject(obj.Name, bkt.CID)
objInfo, err := n.PutObject(ctx, p)
if err != nil {
obj.Error = err
return obj
}
if len(obj.VersionID) == 0 {
obj.DeleteMarkVersion = objInfo.Version()
if settings.VersioningEnabled {
obj.DeleteMarkerEtag = objInfo.HashSum
}
}
}
return obj
func isRegularVersion(version string) bool {
return len(version) != 0 && version != unversionedObjectVersionID
}
// DeleteObjects from the storage.

View file

@ -207,7 +207,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
return nil, err
}
newVersion.OID = id
newVersion.OID = *id
if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, p.Object, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}
@ -337,7 +337,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
meta, err := n.objectHead(ctx, bkt, *node.OID)
meta, err := n.objectHead(ctx, bkt, node.OID)
if err != nil {
return nil, err
}

View file

@ -47,7 +47,7 @@ func (n *layer) HeadSystemObject(ctx context.Context, bkt *data.BucketInfo, objN
return nil, err
}
meta, err := n.objectHead(ctx, bkt, *node.OID)
meta, err := n.objectHead(ctx, bkt, node.OID)
if err != nil {
return nil, err
}
@ -119,7 +119,7 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject
return nil, err
}
newVersion := &BaseNodeVersion{OID: id}
newVersion := &BaseNodeVersion{OID: *id}
if err = n.treeService.AddSystemVersion(ctx, &p.BktInfo.CID, p.ObjName, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}

View file

@ -55,7 +55,7 @@ type NodeVersion struct {
type BaseNodeVersion struct {
ID uint64
OID *oid.ID
OID oid.ID
}
// ErrNodeNotFound is returned from Tree service in case of not found error.

View file

@ -110,7 +110,7 @@ func newNodeVersion(node *tree.GetNodeByPathResponse_Info) (*layer.NodeVersion,
return &layer.NodeVersion{
BaseNodeVersion: layer.BaseNodeVersion{
ID: node.NodeId,
OID: treeNode.ObjID,
OID: *treeNode.ObjID,
},
IsUnversioned: isUnversioned,
IsDeleteMarker: isDeleteMarker,
@ -321,6 +321,10 @@ func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, attr
attrPath: path[len(path)-1],
}
if version.IsDeleteMarker {
meta[isDeleteMarkerKV] = "true"
}
if version.IsUnversioned {
meta[isUnversionedKV] = "true"