frostfs-node/pkg/services/object/delete/service.go
Alex Vanin 11262bed4a [#71] Broadcast tombstone to container
With one tombstone for split objects we can't simply
place it in container. We should inform all nodes that
store split objects of removed original object.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2020-10-05 14:03:55 +03:00

189 lines
4.3 KiB
Go

package deletesvc
import (
"context"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Service struct {
*cfg
}
type Option func(*cfg)
type RelationHeader interface {
HeadRelation(context.Context, *objectSDK.Address) (*object.Object, error)
}
type cfg struct {
ownerID *owner.ID
keyStorage *objutil.KeyStorage
putSvc *putsvc.Service
headSvc *headsvc.Service
hdrLinking RelationHeader
}
func defaultCfg() *cfg {
return new(cfg)
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) {
ownerID := s.ownerID
if token := prm.common.SessionToken(); token != nil {
ownerID = token.OwnerID()
}
if ownerID == nil {
return nil, errors.Errorf("(%T) missing owner identifier", s)
}
addrList, err := s.getRelations(ctx, prm)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get object relations", s)
}
content := object.NewTombstoneContent()
content.SetAddressList(addrList...)
data, err := content.MarshalBinary()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not marshal tombstone content", s)
}
r, err := s.putSvc.Put(ctx)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not open put stream", s)
}
// `WithoutSuccessTracking` option broadcast message to all container nodes.
// For now there is no better solution to distributed tombstones with
// content address storage (CAS) and one tombstone for several split
// objects.
if err := r.Init(new(putsvc.PutInitPrm).
WithObject(newTombstone(ownerID, prm.addr.GetContainerID())).
WithCommonPrm(prm.common).
WithTraverseOption(placement.WithoutSuccessTracking()), // broadcast tombstone, maybe one
); err != nil {
return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s)
}
if err := r.SendChunk(new(putsvc.PutChunkPrm).
WithChunk(data),
); err != nil {
return nil, errors.Wrapf(err, "(%T) could not send tombstone payload", s)
}
if _, err := r.Close(); err != nil {
return nil, errors.Wrapf(err, "(%T) could not close tombstone stream", s)
}
return new(Response), nil
}
func (s *Service) getRelations(ctx context.Context, prm *Prm) ([]*objectSDK.Address, error) {
var res []*objectSDK.Address
if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr); err != nil {
cid := prm.addr.GetContainerID()
for prev := prm.addr.GetObjectID(); prev != nil; {
addr := objectSDK.NewAddress()
addr.SetObjectID(prev)
addr.SetContainerID(cid)
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(addr).
WithCommonPrm(prm.common),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
}
hdr := headResult.Header()
id := hdr.GetID()
prev = hdr.GetPreviousID()
if rightChild := headResult.RightChild(); rightChild != nil {
id = rightChild.GetID()
prev = rightChild.GetPreviousID()
}
addr.SetObjectID(id)
res = append(res, addr)
}
} else {
childList := linking.GetChildren()
res = make([]*objectSDK.Address, 0, len(childList)+1)
for i := range childList {
addr := objectSDK.NewAddress()
addr.SetObjectID(childList[i])
addr.SetContainerID(prm.addr.GetContainerID())
res = append(res, addr)
}
addr := objectSDK.NewAddress()
addr.SetObjectID(linking.GetID())
addr.SetContainerID(prm.addr.GetContainerID())
res = append(res, addr)
}
return res, nil
}
func WithOwnerID(v *owner.ID) Option {
return func(c *cfg) {
c.ownerID = v
}
}
func WithKeyStorage(v *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = v
}
}
func WithPutService(v *putsvc.Service) Option {
return func(c *cfg) {
c.putSvc = v
}
}
func WitHeadService(v *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc = v
}
}
func WithLinkingHeader(v RelationHeader) Option {
return func(c *cfg) {
c.hdrLinking = v
}
}