diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go index fbb51912c..ad94e9679 100644 --- a/pkg/services/object/put/ec.go +++ b/pkg/services/object/put/ec.go @@ -197,14 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er if err != nil { return err } + partsProcessed := make([]atomic.Bool, len(parts)) objID, _ := obj.ID() t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) if err != nil { return err } - eg, egCtx := errgroup.WithContext(ctx) for { + eg, egCtx := errgroup.WithContext(ctx) nodes := t.Next() if len(nodes) == 0 { break @@ -216,14 +217,20 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er } for idx := range parts { - idx := idx - eg.Go(func() error { - return e.writePart(egCtx, parts[idx], idx, nodes, visited) - }) - t.SubmitSuccess() + if !partsProcessed[idx].Load() { + eg.Go(func() error { + err := e.writePart(egCtx, parts[idx], idx, nodes, visited) + if err == nil { + partsProcessed[idx].Store(true) + t.SubmitSuccess() + } + return err + }) + } } + err = eg.Wait() } - if err := eg.Wait(); err != nil { + if err != nil { return errIncompletePut{ singleErr: err, } diff --git a/pkg/services/object/put/ec_test.go b/pkg/services/object/put/ec_test.go new file mode 100644 index 000000000..69342531d --- /dev/null +++ b/pkg/services/object/put/ec_test.go @@ -0,0 +1,191 @@ +package putsvc + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "errors" + "fmt" + "strconv" + "testing" + + rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" + apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" + "git.frostfs.info/TrueCloudLab/tzhash/tz" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +type testPlacementBuilder struct { + vectors [][]netmap.NodeInfo +} + +func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) ( + [][]netmap.NodeInfo, error, +) { + arr := make([]netmap.NodeInfo, len(p.vectors[0])) + copy(arr, p.vectors[0]) + return [][]netmap.NodeInfo{arr}, nil +} + +type nmKeys struct{} + +func (nmKeys) IsLocalKey(_ []byte) bool { + return false +} + +type clientConstructor struct { + vectors [][]netmap.NodeInfo +} + +func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) { + if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) || + bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) { + return multiAddressClient{err: errors.New("node unavailable")}, nil + } + return multiAddressClient{}, nil +} + +type multiAddressClient struct { + client.MultiAddressClient + err error +} + +func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) { + if c.err != nil { + return nil, c.err + } + return &apiclient.ResObjectPutSingle{}, nil +} + +func (c multiAddressClient) ReportError(error) { +} + +func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error { + return nil +} + +func TestECWriter(t *testing.T) { + // Create container with policy EC 1.1 + cnr := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetECDataCount(1) + x1.SetECParityCount(1) + p1.AddReplicas(x1) + cnr.SetPlacementPolicy(p1) + cnr.SetAttribute("cnr", "cnr1") + + cid := cidtest.ID() + + // Create 4 nodes, 2 nodes for chunks, + // 2 nodes for the case when the first two will fail. + ns, _ := testNodeMatrix(t, []int{4}) + + data := make([]byte, 100) + _, _ = rand.Read(data) + ver := version.Current() + + var csum checksum.Checksum + csum.SetSHA256(sha256.Sum256(data)) + + var csumTZ checksum.Checksum + csumTZ.SetTillichZemor(tz.Sum(csum.Value())) + + obj := objectSDK.New() + obj.SetID(oidtest.ID()) + obj.SetOwnerID(usertest.ID()) + obj.SetContainerID(cid) + obj.SetVersion(&ver) + obj.SetPayload(data) + obj.SetPayloadSize(uint64(len(data))) + obj.SetPayloadChecksum(csum) + obj.SetPayloadHomomorphicHash(csumTZ) + + // Builder return nodes without sort by hrw + builder := &testPlacementBuilder{ + vectors: ns, + } + + ownerKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + pool, err := ants.NewPool(4, ants.WithNonblocking(true)) + require.NoError(t, err) + + log, err := logger.NewLogger(nil) + require.NoError(t, err) + + var n nmKeys + ecw := ecWriter{ + cfg: &cfg{ + netmapKeys: n, + remotePool: pool, + log: log, + clientConstructor: clientConstructor{vectors: ns}, + }, + placementOpts: append( + []placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)}, + placement.WithCopyNumbers(nil)), // copies number ignored for EC + container: cnr, + key: &ownerKey.PrivateKey, + relay: nil, + objMetaValid: true, + } + + err = ecw.WriteObject(context.Background(), obj) + require.NoError(t, err) +} + +func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) { + mNodes := make([][]netmap.NodeInfo, len(dim)) + mAddr := make([][]string, len(dim)) + + for i := range dim { + ns := make([]netmap.NodeInfo, dim[i]) + as := make([]string, dim[i]) + + for j := range dim[i] { + a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s", + strconv.Itoa(i), + strconv.Itoa(60000+j), + ) + + var ni netmap.NodeInfo + ni.SetNetworkEndpoints(a) + ni.SetPublicKey([]byte(a)) + + var na network.AddressGroup + + err := na.FromIterator(netmapcore.Node(ni)) + require.NoError(t, err) + + as[j] = network.StringifyGroup(na) + + ns[j] = ni + } + + mNodes[i] = ns + mAddr[i] = as + } + + return mNodes, mAddr +}