From 44fcd2f21294e51308303b77136b2f4b3fde372d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 1 Oct 2020 20:15:28 +0300 Subject: [PATCH] [#64] object/delete: Change the formation of tombstone Make delete service to write list of child object addresses to tombstone payload. Signed-off-by: Leonard Lyubich --- pkg/services/object/delete/service.go | 119 ++++++++++++++++---------- pkg/services/object/delete/util.go | 54 ------------ 2 files changed, 73 insertions(+), 100 deletions(-) diff --git a/pkg/services/object/delete/service.go b/pkg/services/object/delete/service.go index 97ffa9a8d..82c7ae2fc 100644 --- a/pkg/services/object/delete/service.go +++ b/pkg/services/object/delete/service.go @@ -60,69 +60,96 @@ func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) { return nil, errors.Errorf("(%T) missing owner identifier", s) } - tool := &deleteTool{ - ctx: ctx, - putSvc: s.putSvc, - obj: newTombstone(ownerID, prm.addr.GetContainerID()), - addr: prm.addr, - commonPrm: prm.common, + addrList, err := s.getRelations(ctx, prm) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not get object relations", s) } - if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr); err != nil { - if err := s.deleteAll(tool); err != nil { - return nil, errors.Wrapf(err, "(%T) could not delete all object relations", s) - } - } else { - if err := tool.delete(prm.addr.GetObjectID()); err != nil { - return nil, errors.Wrapf(err, "(%T) could not delete object", s) - } + content := object.NewTombstoneContent() + content.SetAddressList(addrList...) - for _, child := range linking.GetChildren() { - if err := tool.delete(child); err != nil { - return nil, errors.Wrapf(err, "(%T) could not delete child object", s) - } - } + data, err := content.MarshalBinary() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not marshal tombstone content", s) + } - if err := tool.delete(linking.GetID()); err != nil { - return nil, errors.Wrapf(err, "(%T) could not delete linking object", s) - } + r, err := s.putSvc.Put(ctx) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not open put stream", s) + } + + if err := r.Init(new(putsvc.PutInitPrm). + WithObject(newTombstone(ownerID, prm.addr.GetContainerID())). + WithCommonPrm(prm.common), + ); err != nil { + return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s) + } + + if err := r.SendChunk(new(putsvc.PutChunkPrm). + WithChunk(data), + ); err != nil { + return nil, errors.Wrapf(err, "(%T) could not send tombstone payload", s) + } + + if _, err := r.Close(); err != nil { + return nil, errors.Wrapf(err, "(%T) could not close tombstone stream", s) } return new(Response), nil } -func (s *Service) deleteAll(tool *deleteTool) error { - headResult, err := s.headSvc.Head(tool.ctx, new(headsvc.Prm). - WithAddress(tool.addr). - WithCommonPrm(tool.commonPrm), - ) - if err != nil { - return errors.Wrapf(err, "(%T) could not receive Head result", s) - } +func (s *Service) getRelations(ctx context.Context, prm *Prm) ([]*objectSDK.Address, error) { + var res []*objectSDK.Address - hdr := headResult.Header() + if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr); err != nil { + cid := prm.addr.GetContainerID() - if err := tool.delete(hdr.GetID()); err != nil { - return errors.Wrapf(err, "(%T) could not remove object", s) - } + for prev := prm.addr.GetObjectID(); prev != nil; { + addr := objectSDK.NewAddress() + addr.SetObjectID(prev) + addr.SetContainerID(cid) - prevID := hdr.GetPreviousID() + headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm). + WithAddress(addr). + WithCommonPrm(prm.common), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive Head result", s) + } - if rightChild := headResult.RightChild(); rightChild != nil { - if err := tool.delete(rightChild.GetID()); err != nil { - return errors.Wrapf(err, "(%T) could not remove right child", s) + hdr := headResult.Header() + id := hdr.GetID() + prev = hdr.GetPreviousID() + + if rightChild := headResult.RightChild(); rightChild != nil { + id = rightChild.GetID() + prev = rightChild.GetPreviousID() + } + + addr.SetObjectID(id) + + res = append(res, addr) + } + } else { + childList := linking.GetChildren() + res = make([]*objectSDK.Address, 0, len(childList)+1) + + for i := range childList { + addr := objectSDK.NewAddress() + addr.SetObjectID(childList[i]) + addr.SetContainerID(prm.addr.GetContainerID()) + + res = append(res, addr) } - prevID = rightChild.GetPreviousID() + addr := objectSDK.NewAddress() + addr.SetObjectID(linking.GetID()) + addr.SetContainerID(prm.addr.GetContainerID()) + + res = append(res, addr) } - if prevID != nil { - tool.addr.SetObjectID(prevID) - - return s.deleteAll(tool) - } - - return nil + return res, nil } func WithOwnerID(v *owner.ID) Option { diff --git a/pkg/services/object/delete/util.go b/pkg/services/object/delete/util.go index 564560347..e60204a53 100644 --- a/pkg/services/object/delete/util.go +++ b/pkg/services/object/delete/util.go @@ -1,29 +1,12 @@ package deletesvc import ( - "context" - "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-node/pkg/core/object" - putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" ) -type deleteTool struct { - ctx context.Context - - putSvc *putsvc.Service - - obj *object.RawObject - - addr *objectSDK.Address - - commonPrm *util.CommonPrm -} - func newTombstone(ownerID *owner.ID, cid *container.ID) *object.RawObject { obj := object.NewRaw() obj.SetContainerID(cid) @@ -32,40 +15,3 @@ func newTombstone(ownerID *owner.ID, cid *container.ID) *object.RawObject { return obj } - -func (d *deleteTool) delete(id *objectSDK.ID) error { - d.addr.SetObjectID(id) - - // FIXME: implement marshaler - addrBytes, err := d.addr.ToV2().StableMarshal(nil) - if err != nil { - return errors.Wrapf(err, "(%T) could not marshal address", d) - } - - r, err := d.putSvc.Put(d.ctx) - if err != nil { - return errors.Wrapf(err, "(%T) could not open put stream", d) - } - - d.obj.SetID(id) - d.obj.SetPayload(addrBytes) - - if err := r.Init(new(putsvc.PutInitPrm). - WithObject(d.obj). - WithCommonPrm(d.commonPrm), - ); err != nil { - return errors.Wrapf(err, "(%T) could not initialize tombstone stream", d) - } - - if err := r.SendChunk(new(putsvc.PutChunkPrm). - WithChunk(addrBytes), - ); err != nil { - return errors.Wrapf(err, "(%T) could not send tombstone payload", d) - } - - if _, err := r.Close(); err != nil { - return errors.Wrapf(err, "(%T) could not close tombstone stream", d) - } - - return nil -}