[#71] Broadcast tombstone to container

With one tombstone for split objects we can't simply
place it in container. We should inform all nodes that
store split objects of removed original object.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-10-05 12:42:45 +03:00 committed by Alex Vanin
parent 9cdf7d3896
commit 11262bed4a
3 changed files with 15 additions and 4 deletions

View file

@ -9,6 +9,7 @@ import (
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
@ -78,9 +79,14 @@ func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) {
return nil, errors.Wrapf(err, "(%T) could not open put stream", s)
}
// `WithoutSuccessTracking` option broadcast message to all container nodes.
// For now there is no better solution to distributed tombstones with
// content address storage (CAS) and one tombstone for several split
// objects.
if err := r.Init(new(putsvc.PutInitPrm).
WithObject(newTombstone(ownerID, prm.addr.GetContainerID())).
WithCommonPrm(prm.common),
WithCommonPrm(prm.common).
WithTraverseOption(placement.WithoutSuccessTracking()), // broadcast tombstone, maybe one
); err != nil {
return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s)
}

View file

@ -26,6 +26,14 @@ func (p *PutInitPrm) WithCommonPrm(v *util.CommonPrm) *PutInitPrm {
return p
}
func (p *PutInitPrm) WithTraverseOption(opt placement.Option) *PutInitPrm {
if p != nil {
p.traverseOpts = append(p.traverseOpts, opt)
}
return p
}
func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm {
if p != nil {
p.hdr = v

View file

@ -91,9 +91,6 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
prm.traverseOpts = make([]placement.Option, 0, 4)
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container