object: Fix Put
for EC
object when node unavailable #1427
2 changed files with 205 additions and 6 deletions
|
@ -197,14 +197,15 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
partsProcessed := make([]atomic.Bool, len(parts))
|
||||||
objID, _ := obj.ID()
|
objID, _ := obj.ID()
|
||||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
|
||||||
for {
|
for {
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
nodes := t.Next()
|
nodes := t.Next()
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
break
|
break
|
||||||
|
@ -216,13 +217,20 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx := range parts {
|
for idx := range parts {
|
||||||
|
if !partsProcessed[idx].Load() {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||||
})
|
if err == nil {
|
||||||
|
partsProcessed[idx].Store(true)
|
||||||
t.SubmitSuccess()
|
t.SubmitSuccess()
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err := eg.Wait(); err != nil {
|
}
|
||||||
|
err = eg.Wait()
|
||||||
|
|||||||
|
}
|
||||||
|
if err != nil {
|
||||||
return errIncompletePut{
|
return errIncompletePut{
|
||||||
singleErr: err,
|
singleErr: err,
|
||||||
}
|
}
|
||||||
|
|
191
pkg/services/object/common/writer/ec_test.go
Normal file
191
pkg/services/object/common/writer/ec_test.go
Normal file
|
@ -0,0 +1,191 @@
|
||||||
|
package writer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha256"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testPlacementBuilder struct {
|
||||||
|
vectors [][]netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
|
||||||
|
[][]netmap.NodeInfo, error,
|
||||||
|
) {
|
||||||
|
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
|
||||||
|
copy(arr, p.vectors[0])
|
||||||
|
return [][]netmap.NodeInfo{arr}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type nmKeys struct{}
|
||||||
|
|
||||||
|
func (nmKeys) IsLocalKey(_ []byte) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientConstructor struct {
|
||||||
|
vectors [][]netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
|
||||||
|
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
|
||||||
|
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
|
||||||
|
return multiAddressClient{err: errors.New("node unavailable")}, nil
|
||||||
|
}
|
||||||
|
return multiAddressClient{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiAddressClient struct {
|
||||||
|
client.MultiAddressClient
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
|
||||||
|
if c.err != nil {
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
|
return &apiclient.ResObjectPutSingle{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c multiAddressClient) ReportError(error) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestECWriter(t *testing.T) {
|
||||||
|
// Create container with policy EC 1.1
|
||||||
|
cnr := container.Container{}
|
||||||
|
p1 := netmap.PlacementPolicy{}
|
||||||
|
p1.SetContainerBackupFactor(1)
|
||||||
|
x1 := netmap.ReplicaDescriptor{}
|
||||||
|
x1.SetECDataCount(1)
|
||||||
|
x1.SetECParityCount(1)
|
||||||
|
p1.AddReplicas(x1)
|
||||||
|
cnr.SetPlacementPolicy(p1)
|
||||||
|
cnr.SetAttribute("cnr", "cnr1")
|
||||||
|
|
||||||
|
cid := cidtest.ID()
|
||||||
|
|
||||||
|
// Create 4 nodes, 2 nodes for chunks,
|
||||||
|
// 2 nodes for the case when the first two will fail.
|
||||||
|
ns, _ := testNodeMatrix(t, []int{4})
|
||||||
|
|
||||||
|
data := make([]byte, 100)
|
||||||
|
_, _ = rand.Read(data)
|
||||||
|
ver := version.Current()
|
||||||
|
|
||||||
|
var csum checksum.Checksum
|
||||||
|
csum.SetSHA256(sha256.Sum256(data))
|
||||||
|
|
||||||
|
var csumTZ checksum.Checksum
|
||||||
|
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
|
||||||
|
|
||||||
|
obj := objectSDK.New()
|
||||||
|
obj.SetID(oidtest.ID())
|
||||||
|
obj.SetOwnerID(usertest.ID())
|
||||||
|
obj.SetContainerID(cid)
|
||||||
|
obj.SetVersion(&ver)
|
||||||
|
obj.SetPayload(data)
|
||||||
|
obj.SetPayloadSize(uint64(len(data)))
|
||||||
|
obj.SetPayloadChecksum(csum)
|
||||||
|
obj.SetPayloadHomomorphicHash(csumTZ)
|
||||||
|
|
||||||
|
// Builder return nodes without sort by hrw
|
||||||
|
builder := &testPlacementBuilder{
|
||||||
|
vectors: ns,
|
||||||
|
}
|
||||||
|
|
||||||
|
ownerKey, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
log, err := logger.NewLogger(nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var n nmKeys
|
||||||
|
ecw := ECWriter{
|
||||||
|
Config: &Config{
|
||||||
|
NetmapKeys: n,
|
||||||
|
RemotePool: pool,
|
||||||
|
Logger: log,
|
||||||
|
ClientConstructor: clientConstructor{vectors: ns},
|
||||||
|
},
|
||||||
|
PlacementOpts: append(
|
||||||
|
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||||
|
placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||||
|
Container: cnr,
|
||||||
|
Key: &ownerKey.PrivateKey,
|
||||||
|
Relay: nil,
|
||||||
|
ObjectMetaValid: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ecw.WriteObject(context.Background(), obj)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
||||||
|
mNodes := make([][]netmap.NodeInfo, len(dim))
|
||||||
|
mAddr := make([][]string, len(dim))
|
||||||
|
|
||||||
|
for i := range dim {
|
||||||
|
ns := make([]netmap.NodeInfo, dim[i])
|
||||||
|
as := make([]string, dim[i])
|
||||||
|
|
||||||
|
for j := range dim[i] {
|
||||||
|
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
|
||||||
|
strconv.Itoa(i),
|
||||||
|
strconv.Itoa(60000+j),
|
||||||
|
)
|
||||||
|
|
||||||
|
var ni netmap.NodeInfo
|
||||||
|
ni.SetNetworkEndpoints(a)
|
||||||
|
ni.SetPublicKey([]byte(a))
|
||||||
|
|
||||||
|
var na network.AddressGroup
|
||||||
|
|
||||||
|
err := na.FromIterator(netmapcore.Node(ni))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
as[j] = network.StringifyGroup(na)
|
||||||
|
|
||||||
|
ns[j] = ni
|
||||||
|
}
|
||||||
|
|
||||||
|
mNodes[i] = ns
|
||||||
|
mAddr[i] = as
|
||||||
|
}
|
||||||
|
|
||||||
|
return mNodes, mAddr
|
||||||
|
}
|
Loading…
Reference in a new issue
The linter hasn't caught this, but it seems the error value is unused, unless it is the last loop iteration.
This was not a problem previously, because waiting was done once, outside the loop.
It this intentional?
Yes, it is intentional. The idea is to iterate over all nodes and return only the last error. All other errors will be logged inside
e.writePart(...)
.