From acd6eb18151d7e2fec413d97e273b13076a9b4bb Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Fri, 11 Oct 2024 15:40:01 +0300 Subject: [PATCH] [#1427] object: Fix `Put` for `EC` object when node unavailable There might be situation when context canceled earlier than traverser move to another part of the nodes. To avoid this, need to wait for the result from concurrent put at each traverser iteration. Signed-off-by: Anton Nikiforov --- pkg/services/object/common/writer/ec.go | 20 +- pkg/services/object/common/writer/ec_test.go | 191 +++++++++++++++++++ 2 files changed, 205 insertions(+), 6 deletions(-) create mode 100644 pkg/services/object/common/writer/ec_test.go diff --git a/pkg/services/object/common/writer/ec.go b/pkg/services/object/common/writer/ec.go index 6b6a14cc0..dffe52a6d 100644 --- a/pkg/services/object/common/writer/ec.go +++ b/pkg/services/object/common/writer/ec.go @@ -197,14 +197,15 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er if err != nil { return err } + partsProcessed := make([]atomic.Bool, len(parts)) objID, _ := obj.ID() t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...) if err != nil { return err } - eg, egCtx := errgroup.WithContext(ctx) for { + eg, egCtx := errgroup.WithContext(ctx) nodes := t.Next() if len(nodes) == 0 { break @@ -216,13 +217,20 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er } for idx := range parts { - eg.Go(func() error { - return e.writePart(egCtx, parts[idx], idx, nodes, visited) - }) - t.SubmitSuccess() + if !partsProcessed[idx].Load() { + eg.Go(func() error { + err := e.writePart(egCtx, parts[idx], idx, nodes, visited) + if err == nil { + partsProcessed[idx].Store(true) + t.SubmitSuccess() + } + return err + }) + } } + err = eg.Wait() } - if err := eg.Wait(); err != nil { + if err != nil { return errIncompletePut{ singleErr: err, } diff --git a/pkg/services/object/common/writer/ec_test.go b/pkg/services/object/common/writer/ec_test.go new file mode 100644 index 000000000..32863d678 --- /dev/null +++ b/pkg/services/object/common/writer/ec_test.go @@ -0,0 +1,191 @@ +package writer + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "errors" + "fmt" + "strconv" + "testing" + + rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" + apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" + "git.frostfs.info/TrueCloudLab/tzhash/tz" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +type testPlacementBuilder struct { + vectors [][]netmap.NodeInfo +} + +func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) ( + [][]netmap.NodeInfo, error, +) { + arr := make([]netmap.NodeInfo, len(p.vectors[0])) + copy(arr, p.vectors[0]) + return [][]netmap.NodeInfo{arr}, nil +} + +type nmKeys struct{} + +func (nmKeys) IsLocalKey(_ []byte) bool { + return false +} + +type clientConstructor struct { + vectors [][]netmap.NodeInfo +} + +func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) { + if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) || + bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) { + return multiAddressClient{err: errors.New("node unavailable")}, nil + } + return multiAddressClient{}, nil +} + +type multiAddressClient struct { + client.MultiAddressClient + err error +} + +func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) { + if c.err != nil { + return nil, c.err + } + return &apiclient.ResObjectPutSingle{}, nil +} + +func (c multiAddressClient) ReportError(error) { +} + +func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error { + return nil +} + +func TestECWriter(t *testing.T) { + // Create container with policy EC 1.1 + cnr := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetECDataCount(1) + x1.SetECParityCount(1) + p1.AddReplicas(x1) + cnr.SetPlacementPolicy(p1) + cnr.SetAttribute("cnr", "cnr1") + + cid := cidtest.ID() + + // Create 4 nodes, 2 nodes for chunks, + // 2 nodes for the case when the first two will fail. + ns, _ := testNodeMatrix(t, []int{4}) + + data := make([]byte, 100) + _, _ = rand.Read(data) + ver := version.Current() + + var csum checksum.Checksum + csum.SetSHA256(sha256.Sum256(data)) + + var csumTZ checksum.Checksum + csumTZ.SetTillichZemor(tz.Sum(csum.Value())) + + obj := objectSDK.New() + obj.SetID(oidtest.ID()) + obj.SetOwnerID(usertest.ID()) + obj.SetContainerID(cid) + obj.SetVersion(&ver) + obj.SetPayload(data) + obj.SetPayloadSize(uint64(len(data))) + obj.SetPayloadChecksum(csum) + obj.SetPayloadHomomorphicHash(csumTZ) + + // Builder return nodes without sort by hrw + builder := &testPlacementBuilder{ + vectors: ns, + } + + ownerKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + pool, err := ants.NewPool(4, ants.WithNonblocking(true)) + require.NoError(t, err) + + log, err := logger.NewLogger(nil) + require.NoError(t, err) + + var n nmKeys + ecw := ECWriter{ + Config: &Config{ + NetmapKeys: n, + RemotePool: pool, + Logger: log, + ClientConstructor: clientConstructor{vectors: ns}, + }, + PlacementOpts: append( + []placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)}, + placement.WithCopyNumbers(nil)), // copies number ignored for EC + Container: cnr, + Key: &ownerKey.PrivateKey, + Relay: nil, + ObjectMetaValid: true, + } + + err = ecw.WriteObject(context.Background(), obj) + require.NoError(t, err) +} + +func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) { + mNodes := make([][]netmap.NodeInfo, len(dim)) + mAddr := make([][]string, len(dim)) + + for i := range dim { + ns := make([]netmap.NodeInfo, dim[i]) + as := make([]string, dim[i]) + + for j := range dim[i] { + a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s", + strconv.Itoa(i), + strconv.Itoa(60000+j), + ) + + var ni netmap.NodeInfo + ni.SetNetworkEndpoints(a) + ni.SetPublicKey([]byte(a)) + + var na network.AddressGroup + + err := na.FromIterator(netmapcore.Node(ni)) + require.NoError(t, err) + + as[j] = network.StringifyGroup(na) + + ns[j] = ni + } + + mNodes[i] = ns + mAddr[i] = as + } + + return mNodes, mAddr +}