forked from TrueCloudLab/frostfs-node
[#64] object/delete: Change the formation of tombstone
Make delete service to write list of child object addresses to tombstone payload. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
33ca88f85f
commit
44fcd2f212
2 changed files with 73 additions and 100 deletions
|
@ -60,69 +60,96 @@ func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) {
|
||||||
return nil, errors.Errorf("(%T) missing owner identifier", s)
|
return nil, errors.Errorf("(%T) missing owner identifier", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
tool := &deleteTool{
|
addrList, err := s.getRelations(ctx, prm)
|
||||||
ctx: ctx,
|
if err != nil {
|
||||||
putSvc: s.putSvc,
|
return nil, errors.Wrapf(err, "(%T) could not get object relations", s)
|
||||||
obj: newTombstone(ownerID, prm.addr.GetContainerID()),
|
|
||||||
addr: prm.addr,
|
|
||||||
commonPrm: prm.common,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr); err != nil {
|
content := object.NewTombstoneContent()
|
||||||
if err := s.deleteAll(tool); err != nil {
|
content.SetAddressList(addrList...)
|
||||||
return nil, errors.Wrapf(err, "(%T) could not delete all object relations", s)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err := tool.delete(prm.addr.GetObjectID()); err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not delete object", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, child := range linking.GetChildren() {
|
data, err := content.MarshalBinary()
|
||||||
if err := tool.delete(child); err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "(%T) could not delete child object", s)
|
return nil, errors.Wrapf(err, "(%T) could not marshal tombstone content", s)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if err := tool.delete(linking.GetID()); err != nil {
|
r, err := s.putSvc.Put(ctx)
|
||||||
return nil, errors.Wrapf(err, "(%T) could not delete linking object", s)
|
if err != nil {
|
||||||
}
|
return nil, errors.Wrapf(err, "(%T) could not open put stream", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.Init(new(putsvc.PutInitPrm).
|
||||||
|
WithObject(newTombstone(ownerID, prm.addr.GetContainerID())).
|
||||||
|
WithCommonPrm(prm.common),
|
||||||
|
); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.SendChunk(new(putsvc.PutChunkPrm).
|
||||||
|
WithChunk(data),
|
||||||
|
); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "(%T) could not send tombstone payload", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := r.Close(); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "(%T) could not close tombstone stream", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
return new(Response), nil
|
return new(Response), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) deleteAll(tool *deleteTool) error {
|
func (s *Service) getRelations(ctx context.Context, prm *Prm) ([]*objectSDK.Address, error) {
|
||||||
headResult, err := s.headSvc.Head(tool.ctx, new(headsvc.Prm).
|
var res []*objectSDK.Address
|
||||||
WithAddress(tool.addr).
|
|
||||||
WithCommonPrm(tool.commonPrm),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not receive Head result", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
hdr := headResult.Header()
|
if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr); err != nil {
|
||||||
|
cid := prm.addr.GetContainerID()
|
||||||
|
|
||||||
if err := tool.delete(hdr.GetID()); err != nil {
|
for prev := prm.addr.GetObjectID(); prev != nil; {
|
||||||
return errors.Wrapf(err, "(%T) could not remove object", s)
|
addr := objectSDK.NewAddress()
|
||||||
}
|
addr.SetObjectID(prev)
|
||||||
|
addr.SetContainerID(cid)
|
||||||
|
|
||||||
prevID := hdr.GetPreviousID()
|
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
||||||
|
WithAddress(addr).
|
||||||
|
WithCommonPrm(prm.common),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
|
||||||
|
}
|
||||||
|
|
||||||
if rightChild := headResult.RightChild(); rightChild != nil {
|
hdr := headResult.Header()
|
||||||
if err := tool.delete(rightChild.GetID()); err != nil {
|
id := hdr.GetID()
|
||||||
return errors.Wrapf(err, "(%T) could not remove right child", s)
|
prev = hdr.GetPreviousID()
|
||||||
|
|
||||||
|
if rightChild := headResult.RightChild(); rightChild != nil {
|
||||||
|
id = rightChild.GetID()
|
||||||
|
prev = rightChild.GetPreviousID()
|
||||||
|
}
|
||||||
|
|
||||||
|
addr.SetObjectID(id)
|
||||||
|
|
||||||
|
res = append(res, addr)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
childList := linking.GetChildren()
|
||||||
|
res = make([]*objectSDK.Address, 0, len(childList)+1)
|
||||||
|
|
||||||
|
for i := range childList {
|
||||||
|
addr := objectSDK.NewAddress()
|
||||||
|
addr.SetObjectID(childList[i])
|
||||||
|
addr.SetContainerID(prm.addr.GetContainerID())
|
||||||
|
|
||||||
|
res = append(res, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
prevID = rightChild.GetPreviousID()
|
addr := objectSDK.NewAddress()
|
||||||
|
addr.SetObjectID(linking.GetID())
|
||||||
|
addr.SetContainerID(prm.addr.GetContainerID())
|
||||||
|
|
||||||
|
res = append(res, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if prevID != nil {
|
return res, nil
|
||||||
tool.addr.SetObjectID(prevID)
|
|
||||||
|
|
||||||
return s.deleteAll(tool)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithOwnerID(v *owner.ID) Option {
|
func WithOwnerID(v *owner.ID) Option {
|
||||||
|
|
|
@ -1,29 +1,12 @@
|
||||||
package deletesvc
|
package deletesvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type deleteTool struct {
|
|
||||||
ctx context.Context
|
|
||||||
|
|
||||||
putSvc *putsvc.Service
|
|
||||||
|
|
||||||
obj *object.RawObject
|
|
||||||
|
|
||||||
addr *objectSDK.Address
|
|
||||||
|
|
||||||
commonPrm *util.CommonPrm
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTombstone(ownerID *owner.ID, cid *container.ID) *object.RawObject {
|
func newTombstone(ownerID *owner.ID, cid *container.ID) *object.RawObject {
|
||||||
obj := object.NewRaw()
|
obj := object.NewRaw()
|
||||||
obj.SetContainerID(cid)
|
obj.SetContainerID(cid)
|
||||||
|
@ -32,40 +15,3 @@ func newTombstone(ownerID *owner.ID, cid *container.ID) *object.RawObject {
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *deleteTool) delete(id *objectSDK.ID) error {
|
|
||||||
d.addr.SetObjectID(id)
|
|
||||||
|
|
||||||
// FIXME: implement marshaler
|
|
||||||
addrBytes, err := d.addr.ToV2().StableMarshal(nil)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not marshal address", d)
|
|
||||||
}
|
|
||||||
|
|
||||||
r, err := d.putSvc.Put(d.ctx)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not open put stream", d)
|
|
||||||
}
|
|
||||||
|
|
||||||
d.obj.SetID(id)
|
|
||||||
d.obj.SetPayload(addrBytes)
|
|
||||||
|
|
||||||
if err := r.Init(new(putsvc.PutInitPrm).
|
|
||||||
WithObject(d.obj).
|
|
||||||
WithCommonPrm(d.commonPrm),
|
|
||||||
); err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not initialize tombstone stream", d)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := r.SendChunk(new(putsvc.PutChunkPrm).
|
|
||||||
WithChunk(addrBytes),
|
|
||||||
); err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not send tombstone payload", d)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := r.Close(); err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not close tombstone stream", d)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue