[#1219] object/put: Support additional container broadcast

There are several cases when we need to spread the object around the
container after its primary placement (e.g. objects of type TOMBSTONE).
It'd be convenient to support this feature in `putsvc.Service`.

Add additional stage of container broadcast after the object is stored.
This stage is carried out no more than once and does not affect the
outcome of the main PUT operation.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-03-05 13:32:11 +03:00 committed by LeL
parent 0bf59522f7
commit a55af18ad1
2 changed files with 78 additions and 6 deletions

View file

@ -12,10 +12,11 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
) )
type distributedTarget struct { type distributedTarget struct {
traverseOpts []placement.Option traversal traversal
remotePool, localPool util.WorkerPool remotePool, localPool util.WorkerPool
@ -34,6 +35,50 @@ type distributedTarget struct {
log *logger.Logger log *logger.Logger
} }
// parameters and state of container traversal.
type traversal struct {
opts []placement.Option
// need of additional broadcast after the object is saved
extraBroadcastEnabled bool
// container nodes which was processed during the primary object placement
mExclude map[string]struct{}
}
// updates traversal parameters after the primary placement finish and
// returns true if additional container broadcast is needed.
func (x *traversal) submitPrimaryPlacementFinish() bool {
if x.extraBroadcastEnabled {
// do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking())
// avoid 2nd broadcast
x.extraBroadcastEnabled = false
return true
}
return false
}
// marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node) {
if x.extraBroadcastEnabled {
if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1)
}
x.mExclude[string(n.PublicKey())] = struct{}{}
}
}
// checks if specified node was processed during the primary object placement.
func (x traversal) processed(n placement.Node) bool {
_, ok := x.mExclude[string(n.PublicKey())]
return ok
}
type nodeDesc struct { type nodeDesc struct {
local bool local bool
@ -106,7 +151,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) { func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) {
traverser, err := placement.NewTraverser( traverser, err := placement.NewTraverser(
append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., append(t.traversal.opts, placement.ForObject(t.obj.ID()))...,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
@ -124,6 +169,11 @@ loop:
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
for i := range addrs { for i := range addrs {
if t.traversal.processed(addrs[i]) {
// it can happen only during additional container broadcast
continue
}
wg.Add(1) wg.Add(1)
addr := addrs[i] addr := addrs[i]
@ -141,7 +191,15 @@ loop:
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
if err := f(nodeDesc{local: isLocal, info: addr}); err != nil { err := f(nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil {
resErr.Store(err) resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
return return
@ -168,6 +226,18 @@ loop:
return nil, err return nil, err
} }
// perform additional container broadcast if needed
if t.traversal.submitPrimaryPlacementFinish() {
_, err = t.iteratePlacement(f)
if err != nil {
t.log.Error("additional container broadcast failure",
zap.Error(err),
)
// we don't fail primary operation because of broadcast failure
}
}
return new(transformer.AccessIdentifiers). return new(transformer.AccessIdentifiers).
WithSelfID(t.obj.ID()), nil WithSelfID(t.obj.ID()), nil
} }

View file

@ -161,7 +161,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
} }
return &distributedTarget{ return &distributedTarget{
traverseOpts: prm.traverseOpts, traversal: traversal{
opts: prm.traverseOpts,
},
remotePool: p.remotePool, remotePool: p.remotePool,
localPool: p.localPool, localPool: p.localPool,
nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget { nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget {