[#1219] object/delete: Change approach of the tombstone broadcast
After `putsvc.Service` started to support additional container broadcast of the saved objects there is no more need to perform broadcast of tombstone object in `deletesvc.Service`. Make `putsvc.Service` to perform additional broadcast of `TOMBSTONE` objects. Remove `broadcastTombstone` stage from `deletesvc.execCtx`, from now it is encapsulated in `saveTombstone` stage. Remove no longer needed `putsvc.PutInitPrm.WithTraverseOption` method. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
a55af18ad1
commit
14ea7b2c47
6 changed files with 11 additions and 43 deletions
|
@ -255,7 +255,7 @@ func (exec *execCtx) initTombstoneObject() bool {
|
|||
}
|
||||
|
||||
func (exec *execCtx) saveTombstone() bool {
|
||||
id, err := exec.svc.placer.put(exec, false)
|
||||
id, err := exec.svc.placer.put(exec)
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
@ -277,22 +277,3 @@ func (exec *execCtx) saveTombstone() bool {
|
|||
|
||||
return true
|
||||
}
|
||||
|
||||
func (exec *execCtx) broadcastTombstone() bool {
|
||||
_, err := exec.svc.placer.put(exec, true)
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug("could not save the tombstone",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
}
|
||||
|
||||
return err == nil
|
||||
}
|
||||
|
|
|
@ -16,14 +16,7 @@ func (exec *execCtx) executeLocal() {
|
|||
|
||||
exec.log.Debug("tombstone structure successfully formed, saving...")
|
||||
|
||||
ok = exec.saveTombstone()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
exec.log.Debug("tombstone successfilly saved, broadcasting...")
|
||||
|
||||
exec.broadcastTombstone()
|
||||
exec.saveTombstone()
|
||||
}
|
||||
|
||||
func (exec *execCtx) formTombstone() (ok bool) {
|
||||
|
|
|
@ -51,7 +51,7 @@ type cfg struct {
|
|||
}
|
||||
|
||||
placer interface {
|
||||
put(*execCtx, bool) (*oidSDK.ID, error)
|
||||
put(*execCtx) (*oidSDK.ID, error)
|
||||
}
|
||||
|
||||
netInfo NetworkInfo
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
oidSDK "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
|
@ -102,7 +101,7 @@ func (s *simpleIDWriter) WriteIDs(ids []*oidSDK.ID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *putSvcWrapper) put(exec *execCtx, broadcast bool) (*oidSDK.ID, error) {
|
||||
func (w *putSvcWrapper) put(exec *execCtx) (*oidSDK.ID, error) {
|
||||
streamer, err := (*putsvc.Service)(w).Put(exec.context())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -114,10 +113,6 @@ func (w *putSvcWrapper) put(exec *execCtx, broadcast bool) (*oidSDK.ID, error) {
|
|||
WithCommonPrm(exec.commonParameters()).
|
||||
WithObject(exec.tombstoneObj.CutPayload())
|
||||
|
||||
if broadcast {
|
||||
initPrm.WithTraverseOption(placement.WithoutSuccessTracking())
|
||||
}
|
||||
|
||||
err = streamer.Init(initPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -29,14 +29,6 @@ func (p *PutInitPrm) WithCommonPrm(v *util.CommonPrm) *PutInitPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithTraverseOption(opt placement.Option) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.traverseOpts = append(p.traverseOpts, opt)
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.hdr = v
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
)
|
||||
|
||||
type Streamer struct {
|
||||
|
@ -160,9 +161,15 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
|||
}
|
||||
}
|
||||
|
||||
// enable additional container broadcast on non-local operation
|
||||
// if object has TOMBSTONE type.
|
||||
withBroadcast := !prm.common.LocalOnly() && prm.hdr.Type() == object.TypeTombstone
|
||||
|
||||
return &distributedTarget{
|
||||
traversal: traversal{
|
||||
opts: prm.traverseOpts,
|
||||
|
||||
extraBroadcastEnabled: withBroadcast,
|
||||
},
|
||||
remotePool: p.remotePool,
|
||||
localPool: p.localPool,
|
||||
|
|
Loading…
Reference in a new issue