forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
1 changed files with 13 additions and 10 deletions
|
@ -146,7 +146,14 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent
|
||||||
t.traversal.extraBroadcastEnabled = true
|
t.traversal.extraBroadcastEnabled = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.iteratePlacement(ctx)
|
if err := t.iteratePlacement(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _ := t.obj.ID()
|
||||||
|
return &transformer.AccessIdentifiers{
|
||||||
|
SelfID: id,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
|
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
|
||||||
|
@ -164,14 +171,14 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
||||||
id, _ := t.obj.ID()
|
id, _ := t.obj.ID()
|
||||||
|
|
||||||
traverser, err := placement.NewTraverser(
|
traverser, err := placement.NewTraverser(
|
||||||
append(t.traversal.opts, placement.ForObject(id))...,
|
append(t.traversal.opts, placement.ForObject(id))...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
|
return fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resErr := &atomic.Value{}
|
resErr := &atomic.Value{}
|
||||||
|
@ -190,23 +197,19 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer.
|
||||||
if !traverser.Success() {
|
if !traverser.Success() {
|
||||||
var err errIncompletePut
|
var err errIncompletePut
|
||||||
err.singleErr, _ = resErr.Load().(error)
|
err.singleErr, _ = resErr.Load().(error)
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform additional container broadcast if needed
|
// perform additional container broadcast if needed
|
||||||
if t.traversal.submitPrimaryPlacementFinish() {
|
if t.traversal.submitPrimaryPlacementFinish() {
|
||||||
_, err = t.iteratePlacement(ctx)
|
err = t.iteratePlacement(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
t.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||||
// we don't fail primary operation because of broadcast failure
|
// we don't fail primary operation because of broadcast failure
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
id, _ = t.obj.ID()
|
return nil
|
||||||
|
|
||||||
return &transformer.AccessIdentifiers{
|
|
||||||
SelfID: id,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, resErr *atomic.Value) bool {
|
func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, resErr *atomic.Value) bool {
|
||||||
|
|
Loading…
Reference in a new issue