[SUPPORT] object: Fix Put for EC object when node unavailable #1489

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/ec-writer into support/v0.42 2024-11-12 11:49:10 +00:00
2 changed files with 205 additions and 7 deletions

View file

@ -197,14 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
if err != nil { if err != nil {
return err return err
} }
partsProcessed := make([]atomic.Bool, len(parts))
objID, _ := obj.ID() objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil { if err != nil {
return err return err
} }
eg, egCtx := errgroup.WithContext(ctx)
for { for {
eg, egCtx := errgroup.WithContext(ctx)
nodes := t.Next() nodes := t.Next()
if len(nodes) == 0 { if len(nodes) == 0 {
break break
@ -216,14 +217,20 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
} }
for idx := range parts { for idx := range parts {
idx := idx if !partsProcessed[idx].Load() {
eg.Go(func() error { eg.Go(func() error {
return e.writePart(egCtx, parts[idx], idx, nodes, visited) err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
}) if err == nil {
partsProcessed[idx].Store(true)
t.SubmitSuccess() t.SubmitSuccess()
} }
return err
})
} }
if err := eg.Wait(); err != nil { }
err = eg.Wait()
}
if err != nil {
return errIncompletePut{ return errIncompletePut{
singleErr: err, singleErr: err,
} }

View file

@ -0,0 +1,191 @@
package putsvc
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"strconv"
"testing"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
type testPlacementBuilder struct {
vectors [][]netmap.NodeInfo
}
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
[][]netmap.NodeInfo, error,
) {
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
copy(arr, p.vectors[0])
return [][]netmap.NodeInfo{arr}, nil
}
type nmKeys struct{}
func (nmKeys) IsLocalKey(_ []byte) bool {
return false
}
type clientConstructor struct {
vectors [][]netmap.NodeInfo
}
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
return multiAddressClient{err: errors.New("node unavailable")}, nil
}
return multiAddressClient{}, nil
}
type multiAddressClient struct {
client.MultiAddressClient
err error
}
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
if c.err != nil {
return nil, c.err
}
return &apiclient.ResObjectPutSingle{}, nil
}
func (c multiAddressClient) ReportError(error) {
}
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
return nil
}
func TestECWriter(t *testing.T) {
// Create container with policy EC 1.1
cnr := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetECDataCount(1)
x1.SetECParityCount(1)
p1.AddReplicas(x1)
cnr.SetPlacementPolicy(p1)
cnr.SetAttribute("cnr", "cnr1")
cid := cidtest.ID()
// Create 4 nodes, 2 nodes for chunks,
// 2 nodes for the case when the first two will fail.
ns, _ := testNodeMatrix(t, []int{4})
data := make([]byte, 100)
_, _ = rand.Read(data)
ver := version.Current()
var csum checksum.Checksum
csum.SetSHA256(sha256.Sum256(data))
var csumTZ checksum.Checksum
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
obj := objectSDK.New()
obj.SetID(oidtest.ID())
obj.SetOwnerID(usertest.ID())
obj.SetContainerID(cid)
obj.SetVersion(&ver)
obj.SetPayload(data)
obj.SetPayloadSize(uint64(len(data)))
obj.SetPayloadChecksum(csum)
obj.SetPayloadHomomorphicHash(csumTZ)
// Builder return nodes without sort by hrw
builder := &testPlacementBuilder{
vectors: ns,
}
ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err)
log, err := logger.NewLogger(nil)
require.NoError(t, err)
var n nmKeys
ecw := ecWriter{
cfg: &cfg{
netmapKeys: n,
remotePool: pool,
log: log,
clientConstructor: clientConstructor{vectors: ns},
},
placementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: cnr,
key: &ownerKey.PrivateKey,
relay: nil,
objMetaValid: true,
}
err = ecw.WriteObject(context.Background(), obj)
require.NoError(t, err)
}
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
mNodes := make([][]netmap.NodeInfo, len(dim))
mAddr := make([][]string, len(dim))
for i := range dim {
ns := make([]netmap.NodeInfo, dim[i])
as := make([]string, dim[i])
for j := range dim[i] {
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
strconv.Itoa(i),
strconv.Itoa(60000+j),
)
var ni netmap.NodeInfo
ni.SetNetworkEndpoints(a)
ni.SetPublicKey([]byte(a))
var na network.AddressGroup
err := na.FromIterator(netmapcore.Node(ni))
require.NoError(t, err)
as[j] = network.StringifyGroup(na)
ns[j] = ni
}
mNodes[i] = ns
mAddr[i] = as
}
return mNodes, mAddr
}