frostfs-node/pkg/services/object/delete/service.go
Leonard Lyubich 869d9e571c [#233] object/head: Remove getting the right child header
With the update of the local storage engine, the headers of virtual objects
are directly given. In this regard, the step with obtaining the the right
child header is removed.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-12-11 17:19:37 +03:00

198 lines
4.4 KiB
Go

package deletesvc
import (
"context"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type Service struct {
*cfg
}
type Option func(*cfg)
type RelationHeader interface {
HeadRelation(context.Context, *objectSDK.Address, *objutil.CommonPrm) (*object.Object, error)
}
type cfg struct {
ownerID *owner.ID
keyStorage *objutil.KeyStorage
putSvc *putsvc.Service
headSvc *headsvc.Service
hdrLinking RelationHeader
log *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
log: zap.L(),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) {
ownerID := s.ownerID
if token := prm.common.SessionToken(); token != nil {
ownerID = token.OwnerID()
}
if ownerID == nil {
return nil, errors.Errorf("(%T) missing owner identifier", s)
}
addrList, err := s.getRelations(ctx, prm)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get object relations", s)
}
content := object.NewTombstoneContent()
content.SetAddressList(addrList...)
data, err := content.MarshalBinary()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not marshal tombstone content", s)
}
r, err := s.putSvc.Put(ctx)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not open put stream", s)
}
// `WithoutSuccessTracking` option broadcast message to all container nodes.
// For now there is no better solution to distributed tombstones with
// content address storage (CAS) and one tombstone for several split
// objects.
if err := r.Init(new(putsvc.PutInitPrm).
WithObject(newTombstone(ownerID, prm.addr.ContainerID())).
WithCommonPrm(prm.common).
WithTraverseOption(placement.WithoutSuccessTracking()), // broadcast tombstone, maybe one
); err != nil {
return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s)
}
if err := r.SendChunk(new(putsvc.PutChunkPrm).
WithChunk(data),
); err != nil {
return nil, errors.Wrapf(err, "(%T) could not send tombstone payload", s)
}
if _, err := r.Close(); err != nil {
return nil, errors.Wrapf(err, "(%T) could not close tombstone stream", s)
}
return new(Response), nil
}
func (s *Service) getRelations(ctx context.Context, prm *Prm) ([]*objectSDK.Address, error) {
var res []*objectSDK.Address
if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr, prm.common); err != nil {
cid := prm.addr.ContainerID()
for prev := prm.addr.ObjectID(); prev != nil; {
addr := objectSDK.NewAddress()
addr.SetObjectID(prev)
addr.SetContainerID(cid)
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(addr).
WithCommonPrm(prm.common),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
}
hdr := headResult.Header()
id := hdr.ID()
prev = hdr.PreviousID()
addr.SetObjectID(id)
res = append(res, addr)
}
} else {
childList := linking.Children()
res = make([]*objectSDK.Address, 0, len(childList)+2) // 1 for parent, 1 for linking
for i := range childList {
addr := objectSDK.NewAddress()
addr.SetObjectID(childList[i])
addr.SetContainerID(prm.addr.ContainerID())
res = append(res, addr)
}
addr := objectSDK.NewAddress()
addr.SetObjectID(linking.ID())
addr.SetContainerID(prm.addr.ContainerID())
res = append(res, addr)
}
res = append(res, prm.addr)
return res, nil
}
func WithOwnerID(v *owner.ID) Option {
return func(c *cfg) {
c.ownerID = v
}
}
func WithKeyStorage(v *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = v
}
}
func WithPutService(v *putsvc.Service) Option {
return func(c *cfg) {
c.putSvc = v
}
}
func WitHeadService(v *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc = v
}
}
func WithLinkingHeader(v RelationHeader) Option {
return func(c *cfg) {
c.hdrLinking = v
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}