From fe3906c2950e35b6180209113ce9bb37f199a357 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 11 Dec 2020 11:04:04 +0300 Subject: [PATCH] [#243] object/delete: Implement new service processing Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 27 +-- pkg/services/object/delete/container.go | 5 + pkg/services/object/delete/delete.go | 48 ++++ pkg/services/object/delete/exec.go | 290 +++++++++++++++++++++++ pkg/services/object/delete/local.go | 53 +++++ pkg/services/object/delete/prm.go | 27 +-- pkg/services/object/delete/res.go | 3 - pkg/services/object/delete/service.go | 207 ++++------------ pkg/services/object/delete/util.go | 137 ++++++++++- pkg/services/object/delete/v2/service.go | 27 ++- pkg/services/object/delete/v2/util.go | 47 +++- 11 files changed, 654 insertions(+), 217 deletions(-) create mode 100644 pkg/services/object/delete/container.go create mode 100644 pkg/services/object/delete/delete.go create mode 100644 pkg/services/object/delete/exec.go create mode 100644 pkg/services/object/delete/local.go delete mode 100644 pkg/services/object/delete/res.go diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 0fa413d0c..1c9ef9ecd 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -326,20 +326,6 @@ func initObjectService(c *cfg) { searchsvcV2.WithKeyStorage(keyStorage), ) - sHead := headsvc.NewService( - headsvc.WithKeyStorage(keyStorage), - headsvc.WithClientCache(clientCache), - headsvc.WithLocalStorage(ls), - headsvc.WithContainerSource(c.cfgObject.cnrStorage), - headsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), - headsvc.WithLocalAddressSource(c), - headsvc.WithWorkerPool(c.cfgObject.pool.head), - headsvc.WithLogger(c.log), - headsvc.WithClientOptions( - client.WithDialTimeout(c.viper.GetDuration(cfgObjectHeadDialTimeout)), - ), - ) - sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), @@ -359,19 +345,16 @@ func initObjectService(c *cfg) { getsvcV2.WithKeyStorage(keyStorage), ) - sDelete := deletesvc.NewService( - deletesvc.WithKeyStorage(keyStorage), - deletesvc.WitHeadService(sHead), - deletesvc.WithPutService(sPut), - deletesvc.WithOwnerID(nodeOwner), - deletesvc.WithLinkingHeader( - headsvc.NewRelationHeader(nil, sHead), - ), + sDelete := deletesvc.New( deletesvc.WithLogger(c.log), + deletesvc.WithHeadService(sGet), + deletesvc.WithSearchService(sSearch), + deletesvc.WithPutService(sPut), ) sDeleteV2 := deletesvcV2.NewService( deletesvcV2.WithInternalService(sDelete), + deletesvcV2.WithKeyStorage(keyStorage), ) objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server, diff --git a/pkg/services/object/delete/container.go b/pkg/services/object/delete/container.go new file mode 100644 index 000000000..a2f099d5b --- /dev/null +++ b/pkg/services/object/delete/container.go @@ -0,0 +1,5 @@ +package deletesvc + +func (exec *execCtx) executeOnContainer() { + exec.log.Debug("request is not rolled over to the container") +} diff --git a/pkg/services/object/delete/delete.go b/pkg/services/object/delete/delete.go new file mode 100644 index 000000000..56aa74e75 --- /dev/null +++ b/pkg/services/object/delete/delete.go @@ -0,0 +1,48 @@ +package deletesvc + +import ( + "context" + + "go.uber.org/zap" +) + +// Delete serves requests to remote the objects. +func (s *Service) Delete(ctx context.Context, prm Prm) error { + exec := &execCtx{ + svc: s, + ctx: ctx, + prm: prm, + } + + exec.setLogger(s.log) + + exec.execute() + + return exec.statusError.err +} + +func (exec *execCtx) execute() { + exec.log.Debug("serving request...") + + // perform local operation + exec.executeLocal() + + exec.analyzeStatus(true) +} + +func (exec *execCtx) analyzeStatus(execCnr bool) { + // analyze local result + switch exec.status { + case statusOK: + exec.log.Debug("operation finished successfully") + default: + exec.log.Debug("operation finished with error", + zap.String("error", exec.err.Error()), + ) + + if execCnr { + exec.executeOnContainer() + exec.analyzeStatus(false) + } + } +} diff --git a/pkg/services/object/delete/exec.go b/pkg/services/object/delete/exec.go new file mode 100644 index 000000000..7e3706401 --- /dev/null +++ b/pkg/services/object/delete/exec.go @@ -0,0 +1,290 @@ +package deletesvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +type statusError struct { + status int + err error +} + +type execCtx struct { + svc *Service + + ctx context.Context + + prm Prm + + statusError + + log *logger.Logger + + tombstone *objectSDK.Tombstone + + splitInfo *objectSDK.SplitInfo + + tombstoneObj *object.RawObject +} + +const ( + statusUndefined int = iota + statusOK +) + +func (exec *execCtx) setLogger(l *logger.Logger) { + exec.log = l.With( + zap.String("request", "DELETE"), + zap.Stringer("address", exec.address()), + zap.Bool("local", exec.isLocal()), + zap.Bool("with session", exec.prm.common.SessionToken() != nil), + zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), + ) +} + +func (exec execCtx) context() context.Context { + return exec.ctx +} + +func (exec execCtx) isLocal() bool { + return exec.prm.common.LocalOnly() +} + +func (exec *execCtx) key() *ecdsa.PrivateKey { + return exec.prm.key +} + +func (exec *execCtx) address() *objectSDK.Address { + return exec.prm.Address() +} + +func (exec *execCtx) containerID() *container.ID { + return exec.prm.Address().ContainerID() +} + +func (exec *execCtx) commonParameters() *util.CommonPrm { + return exec.prm.common +} + +func (exec *execCtx) newAddress(id *objectSDK.ID) *objectSDK.Address { + a := objectSDK.NewAddress() + a.SetObjectID(id) + a.SetContainerID(exec.containerID()) + + return a +} + +func (exec *execCtx) formSplitInfo() bool { + var err error + + exec.splitInfo, err = exec.svc.header.splitInfo(exec) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not compose split info", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + } + + return err == nil +} + +func (exec *execCtx) collectMembers() (ok bool) { + if exec.splitInfo == nil { + exec.log.Debug("no split info, object is PHY") + return true + } + + if exec.splitInfo.Link() != nil { + ok = exec.collectChildren() + } + + if !ok && exec.splitInfo.LastPart() != nil { + ok = exec.collectChain() + if !ok { + return + } + } // may be fail if neither right nor linking ID is set? + + return exec.supplementBySplitID() +} + +func (exec *execCtx) collectChain() bool { + var ( + err error + chain []*objectSDK.ID + ) + + exec.log.Debug("assembling chain...") + + for prev := exec.splitInfo.LastPart(); prev != nil; { + prev, err = exec.svc.header.previous(exec, prev) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not get previous split element", + zap.Stringer("id", prev), + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.status = statusOK + exec.err = nil + } + + chain = append(chain, prev) + } + + exec.addMembers(chain) + exec.tombstone.SetSplitID(exec.splitInfo.SplitID()) + + return true +} + +func (exec *execCtx) collectChildren() bool { + exec.log.Debug("collecting children...") + + children, err := exec.svc.header.children(exec) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not collect object children", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.status = statusOK + exec.err = nil + + exec.addMembers(append(children, exec.splitInfo.Link())) + + return true + } +} + +func (exec *execCtx) supplementBySplitID() bool { + exec.log.Debug("supplement by split ID") + + chain, err := exec.svc.searcher.splitMembers(exec) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not search for split chain members", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.status = statusOK + exec.err = nil + + exec.addMembers(chain) + + return true + } +} + +func (exec *execCtx) addMembers(incoming []*objectSDK.ID) { + members := exec.tombstone.Members() + + for i := range members { + for j := 0; j < len(incoming); j++ { // don't use range, slice mutates in body + if members[i].Equal(incoming[j]) { + incoming = append(incoming[:j], incoming[j+1:]...) + j-- + } + } + } + + exec.tombstone.SetMembers(append(members, incoming...)) +} + +func (exec *execCtx) initTombstoneObject() bool { + payload, err := exec.tombstone.Marshal() + if err != nil { + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not marshal tombstone structure", + zap.String("error", err.Error()), + ) + + return false + } + + exec.tombstoneObj = object.NewRaw() + exec.tombstoneObj.SetContainerID(exec.containerID()) + exec.tombstoneObj.SetOwnerID(exec.commonParameters().SessionToken().OwnerID()) + exec.tombstoneObj.SetType(objectSDK.TypeTombstone) + exec.tombstoneObj.SetPayload(payload) + + return true +} + +func (exec *execCtx) saveTombstone() bool { + id, err := exec.svc.placer.put(exec, false) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not save the tombstone", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.status = statusOK + exec.err = nil + + exec.prm.TombstoneAddressTarget(). + SetAddress(exec.newAddress(id)) + } + + return true +} + +func (exec *execCtx) broadcastTombstone() bool { + _, err := exec.svc.placer.put(exec, true) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not save the tombstone", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + } + + return err == nil +} diff --git a/pkg/services/object/delete/local.go b/pkg/services/object/delete/local.go new file mode 100644 index 000000000..396debd79 --- /dev/null +++ b/pkg/services/object/delete/local.go @@ -0,0 +1,53 @@ +package deletesvc + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +func (exec *execCtx) executeLocal() { + exec.log.Debug("forming tombstone structure...") + + ok := exec.formTombstone() + if !ok { + return + } + + exec.log.Debug("tombstone structure successfully formed, saving...") + + ok = exec.saveTombstone() + if !ok { + return + } + + exec.log.Debug("tombstone successfilly saved, broadcasting...") + + exec.broadcastTombstone() +} + +func (exec *execCtx) formTombstone() (ok bool) { + exec.tombstone = objectSDK.NewTombstone() + exec.addMembers([]*objectSDK.ID{exec.address().ObjectID()}) + + exec.log.Debug("forming split info...") + + ok = exec.formSplitInfo() + if !ok { + return + } + + exec.log.Debug("split info successfully formed, collecting members...") + + ok = exec.collectMembers() + if !ok { + return + } + + exec.log.Debug("members successfully collected") + + ok = exec.initTombstoneObject() + if !ok { + return + } + + return true +} diff --git a/pkg/services/object/delete/prm.go b/pkg/services/object/delete/prm.go index 57fa52f39..bae89262f 100644 --- a/pkg/services/object/delete/prm.go +++ b/pkg/services/object/delete/prm.go @@ -1,28 +1,27 @@ package deletesvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/object" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) +// Prm groups parameters of Delete service call. type Prm struct { + key *ecdsa.PrivateKey + common *util.CommonPrm - addr *object.Address + client.DeleteObjectParams } -func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm { - if p != nil { - p.common = v - } - - return p +// SetCommonParameters sets common parameters of the operation. +func (p *Prm) SetCommonParameters(common *util.CommonPrm) { + p.common = common } -func (p *Prm) WithAddress(v *object.Address) *Prm { - if p != nil { - p.addr = v - } - - return p +// SetPrivateKey sets private key to use during execution. +func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) { + p.key = key } diff --git a/pkg/services/object/delete/res.go b/pkg/services/object/delete/res.go deleted file mode 100644 index db1b34907..000000000 --- a/pkg/services/object/delete/res.go +++ /dev/null @@ -1,3 +0,0 @@ -package deletesvc - -type Response struct{} diff --git a/pkg/services/object/delete/service.go b/pkg/services/object/delete/service.go index c45ce5047..db02754a8 100644 --- a/pkg/services/object/delete/service.go +++ b/pkg/services/object/delete/service.go @@ -1,42 +1,42 @@ package deletesvc import ( - "context" - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" - "github.com/nspcc-dev/neofs-node/pkg/core/object" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/pkg/errors" "go.uber.org/zap" ) +// Service utility serving requests of Object.Get service. type Service struct { *cfg } +// Option is a Service's constructor option. type Option func(*cfg) -type RelationHeader interface { - HeadRelation(context.Context, *objectSDK.Address, *objutil.CommonPrm) (*object.Object, error) -} - type cfg struct { - ownerID *owner.ID - - keyStorage *objutil.KeyStorage - - putSvc *putsvc.Service - - headSvc *headsvc.Service - - hdrLinking RelationHeader - log *logger.Logger + + header interface { + // must return (nil, nil) for PHY objects + splitInfo(*execCtx) (*objectSDK.SplitInfo, error) + + children(*execCtx) ([]*objectSDK.ID, error) + + // must return (nil, nil) for 1st object in chain + previous(*execCtx, *objectSDK.ID) (*objectSDK.ID, error) + } + + searcher interface { + splitMembers(*execCtx) ([]*objectSDK.ID, error) + } + + placer interface { + put(*execCtx, bool) (*objectSDK.ID, error) + } } func defaultCfg() *cfg { @@ -45,7 +45,9 @@ func defaultCfg() *cfg { } } -func NewService(opts ...Option) *Service { +// New creates, initializes and returns utility serving +// Object.Get service requests. +func New(opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -57,142 +59,31 @@ func NewService(opts ...Option) *Service { } } -func (s *Service) Delete(ctx context.Context, prm *Prm) (*Response, error) { - ownerID := s.ownerID - if token := prm.common.SessionToken(); token != nil { - ownerID = token.OwnerID() - } - - if ownerID == nil { - return nil, errors.Errorf("(%T) missing owner identifier", s) - } - - addrList, err := s.getRelations(ctx, prm) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get object relations", s) - } - - content := object.NewTombstoneContent() - content.SetAddressList(addrList...) - - data, err := content.MarshalBinary() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not marshal tombstone content", s) - } - - r, err := s.putSvc.Put(ctx) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not open put stream", s) - } - - // `WithoutSuccessTracking` option broadcast message to all container nodes. - // For now there is no better solution to distributed tombstones with - // content address storage (CAS) and one tombstone for several split - // objects. - if err := r.Init(new(putsvc.PutInitPrm). - WithObject(newTombstone(ownerID, prm.addr.ContainerID())). - WithCommonPrm(prm.common). - WithTraverseOption(placement.WithoutSuccessTracking()), // broadcast tombstone, maybe one - ); err != nil { - return nil, errors.Wrapf(err, "(%T) could not initialize tombstone stream", s) - } - - if err := r.SendChunk(new(putsvc.PutChunkPrm). - WithChunk(data), - ); err != nil { - return nil, errors.Wrapf(err, "(%T) could not send tombstone payload", s) - } - - if _, err := r.Close(); err != nil { - return nil, errors.Wrapf(err, "(%T) could not close tombstone stream", s) - } - - return new(Response), nil -} - -func (s *Service) getRelations(ctx context.Context, prm *Prm) ([]*objectSDK.Address, error) { - var res []*objectSDK.Address - - if linking, err := s.hdrLinking.HeadRelation(ctx, prm.addr, prm.common); err != nil { - cid := prm.addr.ContainerID() - - for prev := prm.addr.ObjectID(); prev != nil; { - addr := objectSDK.NewAddress() - addr.SetObjectID(prev) - addr.SetContainerID(cid) - - headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm). - WithAddress(addr). - WithCommonPrm(prm.common), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive Head result", s) - } - - hdr := headResult.Header() - id := hdr.ID() - prev = hdr.PreviousID() - - addr.SetObjectID(id) - - res = append(res, addr) - } - } else { - childList := linking.Children() - res = make([]*objectSDK.Address, 0, len(childList)+2) // 1 for parent, 1 for linking - - for i := range childList { - addr := objectSDK.NewAddress() - addr.SetObjectID(childList[i]) - addr.SetContainerID(prm.addr.ContainerID()) - - res = append(res, addr) - } - - addr := objectSDK.NewAddress() - addr.SetObjectID(linking.ID()) - addr.SetContainerID(prm.addr.ContainerID()) - - res = append(res, addr) - } - - res = append(res, prm.addr) - - return res, nil -} - -func WithOwnerID(v *owner.ID) Option { - return func(c *cfg) { - c.ownerID = v - } -} - -func WithKeyStorage(v *objutil.KeyStorage) Option { - return func(c *cfg) { - c.keyStorage = v - } -} - -func WithPutService(v *putsvc.Service) Option { - return func(c *cfg) { - c.putSvc = v - } -} - -func WitHeadService(v *headsvc.Service) Option { - return func(c *cfg) { - c.headSvc = v - } -} - -func WithLinkingHeader(v RelationHeader) Option { - return func(c *cfg) { - c.hdrLinking = v - } -} - +// WithLogger returns option to specify Delete service's logger. func WithLogger(l *logger.Logger) Option { return func(c *cfg) { - c.log = l + c.log = l.With(zap.String("component", "Object.Delete service")) + } +} + +// WithHeadService returns option to set Head service +// to work with object headers. +func WithHeadService(h *getsvc.Service) Option { + return func(c *cfg) { + c.header = (*headSvcWrapper)(h) + } +} + +// WithClientCache returns option to set cache of remote node clients. +func WithSearchService(s *searchsvc.Service) Option { + return func(c *cfg) { + c.searcher = (*searchSvcWrapper)(s) + } +} + +// WithClientOptions returns option to specify options of remote node clients. +func WithPutService(p *putsvc.Service) Option { + return func(c *cfg) { + c.placer = (*putSvcWrapper)(p) } } diff --git a/pkg/services/object/delete/util.go b/pkg/services/object/delete/util.go index e60204a53..ce0cf49eb 100644 --- a/pkg/services/object/delete/util.go +++ b/pkg/services/object/delete/util.go @@ -1,17 +1,138 @@ package deletesvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/container" + "errors" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-node/pkg/core/object" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" ) -func newTombstone(ownerID *owner.ID, cid *container.ID) *object.RawObject { - obj := object.NewRaw() - obj.SetContainerID(cid) - obj.SetOwnerID(ownerID) - obj.SetType(objectSDK.TypeTombstone) +type headSvcWrapper getsvc.Service - return obj +type searchSvcWrapper searchsvc.Service + +type putSvcWrapper putsvc.Service + +type simpleIDWriter struct { + ids []*objectSDK.ID +} + +func (w *headSvcWrapper) headAddress(exec *execCtx, addr *objectSDK.Address) (*object.Object, error) { + wr := getsvc.NewSimpleObjectWriter() + + p := getsvc.HeadPrm{} + p.SetPrivateKey(exec.key()) + p.SetCommonParameters(exec.commonParameters()) + p.SetHeaderWriter(wr) + p.WithRawFlag(true) + p.WithAddress(addr) + + err := (*getsvc.Service)(w).Head(exec.context(), p) + if err != nil { + return nil, err + } + + return wr.Object(), nil +} + +func (w *headSvcWrapper) splitInfo(exec *execCtx) (*objectSDK.SplitInfo, error) { + _, err := w.headAddress(exec, exec.address()) + + var errSplitInfo *objectSDK.SplitInfoError + + switch { + case err == nil: + return nil, nil + case errors.As(err, &errSplitInfo): + return errSplitInfo.SplitInfo(), nil + default: + return nil, err + } +} + +func (w *headSvcWrapper) children(exec *execCtx) ([]*objectSDK.ID, error) { + a := exec.newAddress(exec.splitInfo.Link()) + + linking, err := w.headAddress(exec, a) + if err != nil { + return nil, err + } + + return linking.Children(), nil +} + +func (w *headSvcWrapper) previous(exec *execCtx, id *objectSDK.ID) (*objectSDK.ID, error) { + a := exec.newAddress(id) + + h, err := w.headAddress(exec, a) + if err != nil { + return nil, err + } + + return h.PreviousID(), nil +} + +func (w *searchSvcWrapper) splitMembers(exec *execCtx) ([]*objectSDK.ID, error) { + fs := objectSDK.SearchFilters{} + fs.AddSplitIDFilter(objectSDK.MatchStringEqual, exec.splitInfo.SplitID()) + + wr := new(simpleIDWriter) + + p := searchsvc.Prm{} + p.SetWriter(wr) + p.SetCommonParameters(exec.commonParameters()) + p.SetPrivateKey(exec.key()) + p.WithContainerID(exec.containerID()) + p.WithSearchFilters(fs) + + err := (*searchsvc.Service)(w).Search(exec.context(), p) + if err != nil { + return nil, err + } + + return wr.ids, nil +} + +func (s simpleIDWriter) WriteIDs(ids []*objectSDK.ID) error { + s.ids = append(s.ids, ids...) + + return nil +} + +func (w *putSvcWrapper) put(exec *execCtx, broadcast bool) (*objectSDK.ID, error) { + streamer, err := (*putsvc.Service)(w).Put(exec.context()) + if err != nil { + return nil, err + } + + payload := exec.tombstoneObj.Payload() + + initPrm := new(putsvc.PutInitPrm). + WithCommonPrm(exec.commonParameters()). + WithObject(exec.tombstoneObj.CutPayload()) + + if broadcast { + initPrm.WithTraverseOption(placement.WithoutSuccessTracking()) + } + + err = streamer.Init(initPrm) + if err != nil { + return nil, err + } + + err = streamer.SendChunk(new(putsvc.PutChunkPrm).WithChunk(payload)) + if err != nil { + return nil, err + } + + r, err := streamer.Close() + if err != nil { + return nil, err + } + + return r.ObjectID(), nil } diff --git a/pkg/services/object/delete/v2/service.go b/pkg/services/object/delete/v2/service.go index 3e3bbb676..9e7a3ceeb 100644 --- a/pkg/services/object/delete/v2/service.go +++ b/pkg/services/object/delete/v2/service.go @@ -5,7 +5,7 @@ import ( objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" - "github.com/pkg/errors" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) // Service implements Delete operation of Object service v2. @@ -18,6 +18,8 @@ type Option func(*cfg) type cfg struct { svc *deletesvc.Service + + keyStorage *objutil.KeyStorage } // NewService constructs Service instance from provided options. @@ -35,12 +37,22 @@ func NewService(opts ...Option) *Service { // Delete calls internal service. func (s *Service) Delete(ctx context.Context, req *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) { - r, err := s.svc.Delete(ctx, toPrm(req)) + resp := new(objectV2.DeleteResponse) + + body := new(objectV2.DeleteResponseBody) + resp.SetBody(body) + + p, err := s.toPrm(req, body) if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get object header", s) + return nil, err } - return fromResponse(r), nil + err = s.svc.Delete(ctx, *p) + if err != nil { + return nil, err + } + + return resp, nil } func WithInternalService(v *deletesvc.Service) Option { @@ -48,3 +60,10 @@ func WithInternalService(v *deletesvc.Service) Option { c.svc = v } } + +// WithKeyStorage returns option to set local private key storage. +func WithKeyStorage(ks *objutil.KeyStorage) Option { + return func(c *cfg) { + c.keyStorage = ks + } +} diff --git a/pkg/services/object/delete/v2/util.go b/pkg/services/object/delete/v2/util.go index 6cf0733cc..019f368b4 100644 --- a/pkg/services/object/delete/v2/util.go +++ b/pkg/services/object/delete/v2/util.go @@ -2,21 +2,52 @@ package deletesvc import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/token" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/session" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) -func toPrm(req *objectV2.DeleteRequest) *deletesvc.Prm { +type tombstoneBodyWriter struct { + body *objectV2.DeleteResponseBody +} + +func (s *Service) toPrm(req *objectV2.DeleteRequest, respBody *objectV2.DeleteResponseBody) (*deletesvc.Prm, error) { + meta := req.GetMetaHeader() + + key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) + if err != nil { + return nil, err + } + + p := new(deletesvc.Prm) + p.SetPrivateKey(key) + p.SetCommonParameters(commonParameters(meta)) + body := req.GetBody() + p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.WithTombstoneAddressTarget(&tombstoneBodyWriter{ + body: respBody, + }) - return new(deletesvc.Prm). - WithAddress( - object.NewAddressFromV2(body.GetAddress()), - ). - WithCommonPrm(util.CommonPrmFromV2(req)) + return p, nil } -func fromResponse(r *deletesvc.Response) *objectV2.DeleteResponse { - return new(objectV2.DeleteResponse) +func (w *tombstoneBodyWriter) SetAddress(addr *object.Address) { + w.body.SetTombstone(addr.ToV2()) +} + +func commonParameters(meta *session.RequestMetaHeader) *util.CommonPrm { + prm := new(util.CommonPrm) + + if tok := meta.GetBearerToken(); tok != nil { + prm.WithBearerToken(token.NewBearerTokenFromV2(tok)) + } + + if tok := meta.GetSessionToken(); tok != nil { + prm.WithSessionToken(token.NewSessionTokenFromV2(tok)) + } + + return prm }