object: Fix Put for EC object when node unavailable #1427

Merged
acid-ant merged 1 commit from acid-ant/frostfs-node:bugfix/ec-writer into master 2024-10-14 07:23:49 +00:00
2 changed files with 205 additions and 6 deletions

View file

@ -197,14 +197,15 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
if err != nil { if err != nil {
return err return err
} }
partsProcessed := make([]atomic.Bool, len(parts))
objID, _ := obj.ID() objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...) t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil { if err != nil {
return err return err
} }
eg, egCtx := errgroup.WithContext(ctx)
for { for {
eg, egCtx := errgroup.WithContext(ctx)
nodes := t.Next() nodes := t.Next()
if len(nodes) == 0 { if len(nodes) == 0 {
break break
@ -216,13 +217,20 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
} }
for idx := range parts { for idx := range parts {
eg.Go(func() error { if !partsProcessed[idx].Load() {
return e.writePart(egCtx, parts[idx], idx, nodes, visited) eg.Go(func() error {
}) err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
t.SubmitSuccess() if err == nil {
partsProcessed[idx].Store(true)
t.SubmitSuccess()
}
return err
})
}
} }
err = eg.Wait()
Review

The linter hasn't caught this, but it seems the error value is unused, unless it is the last loop iteration.
This was not a problem previously, because waiting was done once, outside the loop.

The linter hasn't caught this, but it seems the error value is unused, unless it is the last loop iteration. This was not a problem previously, because waiting was done once, outside the loop.
Review

It this intentional?

It this intentional?
Review

Yes, it is intentional. The idea is to iterate over all nodes and return only the last error. All other errors will be logged inside e.writePart(...).

Yes, it is intentional. The idea is to iterate over all nodes and return only the last error. All other errors will be logged inside `e.writePart(...)`.
} }
if err := eg.Wait(); err != nil { if err != nil {
return errIncompletePut{ return errIncompletePut{
singleErr: err, singleErr: err,
} }

View file

@ -0,0 +1,191 @@
package writer
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"strconv"
"testing"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
type testPlacementBuilder struct {
vectors [][]netmap.NodeInfo
}
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
[][]netmap.NodeInfo, error,
) {
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
copy(arr, p.vectors[0])
return [][]netmap.NodeInfo{arr}, nil
}
type nmKeys struct{}
func (nmKeys) IsLocalKey(_ []byte) bool {
return false
}
type clientConstructor struct {
vectors [][]netmap.NodeInfo
}
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
return multiAddressClient{err: errors.New("node unavailable")}, nil
}
return multiAddressClient{}, nil
}
type multiAddressClient struct {
client.MultiAddressClient
err error
}
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
if c.err != nil {
return nil, c.err
}
return &apiclient.ResObjectPutSingle{}, nil
}
func (c multiAddressClient) ReportError(error) {
}
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
return nil
}
func TestECWriter(t *testing.T) {
// Create container with policy EC 1.1
cnr := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetECDataCount(1)
x1.SetECParityCount(1)
p1.AddReplicas(x1)
cnr.SetPlacementPolicy(p1)
cnr.SetAttribute("cnr", "cnr1")
cid := cidtest.ID()
// Create 4 nodes, 2 nodes for chunks,
// 2 nodes for the case when the first two will fail.
ns, _ := testNodeMatrix(t, []int{4})
data := make([]byte, 100)
_, _ = rand.Read(data)
ver := version.Current()
var csum checksum.Checksum
csum.SetSHA256(sha256.Sum256(data))
var csumTZ checksum.Checksum
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
obj := objectSDK.New()
obj.SetID(oidtest.ID())
obj.SetOwnerID(usertest.ID())
obj.SetContainerID(cid)
obj.SetVersion(&ver)
obj.SetPayload(data)
obj.SetPayloadSize(uint64(len(data)))
obj.SetPayloadChecksum(csum)
obj.SetPayloadHomomorphicHash(csumTZ)
// Builder return nodes without sort by hrw
builder := &testPlacementBuilder{
vectors: ns,
}
ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err)
log, err := logger.NewLogger(nil)
require.NoError(t, err)
var n nmKeys
ecw := ECWriter{
Config: &Config{
NetmapKeys: n,
RemotePool: pool,
Logger: log,
ClientConstructor: clientConstructor{vectors: ns},
},
PlacementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
placement.WithCopyNumbers(nil)), // copies number ignored for EC
Container: cnr,
Key: &ownerKey.PrivateKey,
Relay: nil,
ObjectMetaValid: true,
}
err = ecw.WriteObject(context.Background(), obj)
require.NoError(t, err)
}
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
mNodes := make([][]netmap.NodeInfo, len(dim))
mAddr := make([][]string, len(dim))
for i := range dim {
ns := make([]netmap.NodeInfo, dim[i])
as := make([]string, dim[i])
for j := range dim[i] {
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
strconv.Itoa(i),
strconv.Itoa(60000+j),
)
var ni netmap.NodeInfo
ni.SetNetworkEndpoints(a)
ni.SetPublicKey([]byte(a))
var na network.AddressGroup
err := na.FromIterator(netmapcore.Node(ni))
require.NoError(t, err)
as[j] = network.StringifyGroup(na)
ns[j] = ni
}
mNodes[i] = ns
mAddr[i] = as
}
return mNodes, mAddr
}