[#243] object/delete: Implement new service processing
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
510e9ff2ec
commit
fe3906c295
11 changed files with 654 additions and 217 deletions
|
@ -1,42 +1,42 @@
|
|||
package deletesvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Service utility serving requests of Object.Get service.
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
// Option is a Service's constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type RelationHeader interface {
|
||||
HeadRelation(context.Context, *objectSDK.Address, *objutil.CommonPrm) (*object.Object, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
ownerID *owner.ID
|
||||
|
||||
keyStorage *objutil.KeyStorage
|
||||
|
||||
putSvc *putsvc.Service
|
||||
|
||||
headSvc *headsvc.Service
|
||||
|
||||
hdrLinking RelationHeader
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
header interface {
|
||||
// must return (nil, nil) for PHY objects
|
||||
splitInfo(*execCtx) (*objectSDK.SplitInfo, error)
|
||||
|
||||
children(*execCtx) ([]*objectSDK.ID, error)
|
||||
|
||||
// must return (nil, nil) for 1st object in chain
|
||||
previous(*execCtx, *objectSDK.ID) (*objectSDK.ID, error)
|
||||
}
|
||||
|
||||
searcher interface {
|
||||
splitMembers(*execCtx) ([]*objectSDK.ID, error)
|
||||
}
|
||||
|
||||
placer interface {
|
||||
put(*execCtx, bool) (*objectSDK.ID, error)
|
||||
}
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -45,7 +45,9 @@ func defaultCfg() *cfg {
|
|||
}
|
||||
}
|
||||
|
||||
func NewService(opts ...Option) *Service {
|
||||
// New creates, initializes and returns utility serving
|
||||
// Object.Get service requests.
|
||||
func New(opts ...Option) *Service {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
|
@ -57,142 +59,31 @@ func NewService(opts ...Option) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) {
|
||||
ownerID := s.ownerID
|
||||
if token := prm.common.SessionToken(); token != nil {
|
||||
ownerID = token.OwnerID()
|
||||
}
|
||||
|
||||
if ownerID == nil {
|
||||
return nil, errors.Errorf("(%T) missing owner identifier", s)
|
||||
}
|
||||
|
||||
addrList, err := s.getRelations(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not get object relations", s)
|
||||
}
|
||||
|
||||
content := object.NewTombstoneContent()
|
||||
content.SetAddressList(addrList...)
|
||||
|
||||
data, err := content.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not marshal tombstone content", s)
|
||||
}
|
||||
|
||||
r, err := s.putSvc.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not open put stream", s)
|
||||
}
|
||||
|
||||
// `WithoutSuccessTracking` option broadcast message to all container nodes.
|
||||
// For now there is no better solution to distributed tombstones with
|
||||
// content address storage (CAS) and one tombstone for several split
|
||||
// objects.
|
||||
if err := r.Init(new(putsvc.PutInitPrm).
|
||||
WithObject(newTombstone(ownerID, prm.addr.ContainerID())).
|
||||
WithCommonPrm(prm.common).
|
||||
WithTraverseOption(placement.WithoutSuccessTracking()), // broadcast tombstone, maybe one
|
||||
); err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s)
|
||||
}
|
||||
|
||||
if err := r.SendChunk(new(putsvc.PutChunkPrm).
|
||||
WithChunk(data),
|
||||
); err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not send tombstone payload", s)
|
||||
}
|
||||
|
||||
if _, err := r.Close(); err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not close tombstone stream", s)
|
||||
}
|
||||
|
||||
return new(Response), nil
|
||||
}
|
||||
|
||||
func (s *Service) getRelations(ctx context.Context, prm *Prm) ([]*objectSDK.Address, error) {
|
||||
var res []*objectSDK.Address
|
||||
|
||||
if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr, prm.common); err != nil {
|
||||
cid := prm.addr.ContainerID()
|
||||
|
||||
for prev := prm.addr.ObjectID(); prev != nil; {
|
||||
addr := objectSDK.NewAddress()
|
||||
addr.SetObjectID(prev)
|
||||
addr.SetContainerID(cid)
|
||||
|
||||
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
||||
WithAddress(addr).
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
|
||||
}
|
||||
|
||||
hdr := headResult.Header()
|
||||
id := hdr.ID()
|
||||
prev = hdr.PreviousID()
|
||||
|
||||
addr.SetObjectID(id)
|
||||
|
||||
res = append(res, addr)
|
||||
}
|
||||
} else {
|
||||
childList := linking.Children()
|
||||
res = make([]*objectSDK.Address, 0, len(childList)+2) // 1 for parent, 1 for linking
|
||||
|
||||
for i := range childList {
|
||||
addr := objectSDK.NewAddress()
|
||||
addr.SetObjectID(childList[i])
|
||||
addr.SetContainerID(prm.addr.ContainerID())
|
||||
|
||||
res = append(res, addr)
|
||||
}
|
||||
|
||||
addr := objectSDK.NewAddress()
|
||||
addr.SetObjectID(linking.ID())
|
||||
addr.SetContainerID(prm.addr.ContainerID())
|
||||
|
||||
res = append(res, addr)
|
||||
}
|
||||
|
||||
res = append(res, prm.addr)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func WithOwnerID(v *owner.ID) Option {
|
||||
return func(c *cfg) {
|
||||
c.ownerID = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithKeyStorage(v *objutil.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStorage = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithPutService(v *putsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.putSvc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WitHeadService(v *headsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.headSvc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithLinkingHeader(v RelationHeader) Option {
|
||||
return func(c *cfg) {
|
||||
c.hdrLinking = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to specify Delete service's logger.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
c.log = l.With(zap.String("component", "Object.Delete service"))
|
||||
}
|
||||
}
|
||||
|
||||
// WithHeadService returns option to set Head service
|
||||
// to work with object headers.
|
||||
func WithHeadService(h *getsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.header = (*headSvcWrapper)(h)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCache returns option to set cache of remote node clients.
|
||||
func WithSearchService(s *searchsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.searcher = (*searchSvcWrapper)(s)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientOptions returns option to specify options of remote node clients.
|
||||
func WithPutService(p *putsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.placer = (*putSvcWrapper)(p)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue