[#xx] object: Fix Put
for EC
object when node unavailable
Some checks failed
DCO action / DCO (pull_request) Failing after 1m45s
Tests and linters / Run gofumpt (pull_request) Successful in 1m56s
Build / Build Components (pull_request) Successful in 2m15s
Tests and linters / Staticcheck (pull_request) Successful in 2m59s
Vulncheck / Vulncheck (pull_request) Successful in 3m17s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m48s
Tests and linters / gopls check (pull_request) Successful in 3m53s
Tests and linters / Lint (pull_request) Successful in 4m43s
Tests and linters / Tests (pull_request) Successful in 6m55s
Tests and linters / Tests with -race (pull_request) Successful in 7m26s
Some checks failed
DCO action / DCO (pull_request) Failing after 1m45s
Tests and linters / Run gofumpt (pull_request) Successful in 1m56s
Build / Build Components (pull_request) Successful in 2m15s
Tests and linters / Staticcheck (pull_request) Successful in 2m59s
Vulncheck / Vulncheck (pull_request) Successful in 3m17s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m48s
Tests and linters / gopls check (pull_request) Successful in 3m53s
Tests and linters / Lint (pull_request) Successful in 4m43s
Tests and linters / Tests (pull_request) Successful in 6m55s
Tests and linters / Tests with -race (pull_request) Successful in 7m26s
Add test for `ECWriter`. Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
42bf03e5cc
commit
b7ad738537
2 changed files with 242 additions and 6 deletions
|
@ -197,14 +197,15 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partsProcessed := make([]atomic.Bool, len(parts))
|
||||
objID, _ := obj.ID()
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
nodes := t.Next()
|
||||
if len(nodes) == 0 {
|
||||
break
|
||||
|
@ -216,13 +217,20 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
}
|
||||
|
||||
for idx := range parts {
|
||||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
if !partsProcessed[idx].Load() {
|
||||
eg.Go(func() error {
|
||||
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
if err == nil {
|
||||
partsProcessed[idx].Store(true)
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
err = eg.Wait()
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
if err != nil {
|
||||
return errIncompletePut{
|
||||
singleErr: err,
|
||||
}
|
||||
|
|
228
pkg/services/object/common/writer/ec_test.go
Normal file
228
pkg/services/object/common/writer/ec_test.go
Normal file
|
@ -0,0 +1,228 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testPlacementBuilder struct {
|
||||
vectors [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
|
||||
[][]netmap.NodeInfo, error,
|
||||
) {
|
||||
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
|
||||
copy(arr, p.vectors[0])
|
||||
return [][]netmap.NodeInfo{arr}, nil
|
||||
}
|
||||
|
||||
type nmKeys struct{}
|
||||
|
||||
func (nmKeys) IsLocalKey(_ []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type clientConstructor struct {
|
||||
vectors [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
|
||||
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
|
||||
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
|
||||
return multiAddressClient{err: errors.New("node unavailable")}, nil
|
||||
}
|
||||
return multiAddressClient{}, nil
|
||||
}
|
||||
|
||||
type multiAddressClient struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectPutInit(_ context.Context, _ apiclient.PrmObjectPutInit) (apiclient.ObjectWriter, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
|
||||
if c.err != nil {
|
||||
return nil, c.err
|
||||
}
|
||||
return &apiclient.ResObjectPutSingle{}, nil
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectDelete(_ context.Context, _ apiclient.PrmObjectDelete) (*apiclient.ResObjectDelete, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectGetInit(_ context.Context, _ apiclient.PrmObjectGet) (*apiclient.ObjectReader, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectHead(_ context.Context, _ apiclient.PrmObjectHead) (*apiclient.ResObjectHead, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectSearchInit(_ context.Context, _ apiclient.PrmObjectSearch) (*apiclient.ObjectListReader, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectRangeInit(_ context.Context, _ apiclient.PrmObjectRange) (*apiclient.ObjectRangeReader, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectHash(_ context.Context, _ apiclient.PrmObjectHash) (*apiclient.ResObjectHash, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ExecRaw(_ func(client *rawclient.Client) error) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) Close() error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ReportError(error) {
|
||||
}
|
||||
|
||||
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestECWriter(t *testing.T) {
|
||||
// Create container with policy EC 1.1
|
||||
cnr := container.Container{}
|
||||
p1 := netmap.PlacementPolicy{}
|
||||
p1.SetContainerBackupFactor(1)
|
||||
x1 := netmap.ReplicaDescriptor{}
|
||||
x1.SetECDataCount(1)
|
||||
x1.SetECParityCount(1)
|
||||
p1.AddReplicas(x1)
|
||||
cnr.SetPlacementPolicy(p1)
|
||||
cnr.SetAttribute("cnr", "cnr1")
|
||||
|
||||
cid := cidtest.ID()
|
||||
|
||||
// Create 4 nodes, 2 nodes for chunks,
|
||||
// 2 nodes for the case when the first two will fail.
|
||||
ns, _ := testNodeMatrix(t, []int{4})
|
||||
|
||||
data := make([]byte, 100)
|
||||
_, _ = rand.Read(data)
|
||||
var ver version.Version
|
||||
ver.SetMajor(2)
|
||||
ver.SetMinor(1)
|
||||
|
||||
var csum checksum.Checksum
|
||||
csum.SetSHA256(sha256.Sum256(data))
|
||||
|
||||
var csumTZ checksum.Checksum
|
||||
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
|
||||
|
||||
obj := objectSDK.New()
|
||||
obj.SetID(oidtest.ID())
|
||||
obj.SetOwnerID(usertest.ID())
|
||||
obj.SetContainerID(cid)
|
||||
obj.SetVersion(&ver)
|
||||
obj.SetPayload(data)
|
||||
obj.SetPayloadSize(uint64(len(data)))
|
||||
obj.SetPayloadChecksum(csum)
|
||||
obj.SetPayloadHomomorphicHash(csumTZ)
|
||||
|
||||
// Builder return nodes without sort by hrw
|
||||
builder := &testPlacementBuilder{
|
||||
vectors: ns,
|
||||
}
|
||||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
log, err := logger.NewLogger(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var n nmKeys
|
||||
ecw := ECWriter{
|
||||
Config: &Config{
|
||||
NetmapKeys: n,
|
||||
RemotePool: pool,
|
||||
Logger: log,
|
||||
ClientConstructor: clientConstructor{vectors: ns},
|
||||
},
|
||||
PlacementOpts: append(
|
||||
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||
placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
Container: cnr,
|
||||
Key: &ownerKey.PrivateKey,
|
||||
Relay: nil,
|
||||
ObjectMetaValid: true,
|
||||
}
|
||||
|
||||
err = ecw.WriteObject(context.Background(), obj)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
||||
mNodes := make([][]netmap.NodeInfo, len(dim))
|
||||
mAddr := make([][]string, len(dim))
|
||||
|
||||
for i := range dim {
|
||||
ns := make([]netmap.NodeInfo, dim[i])
|
||||
as := make([]string, dim[i])
|
||||
|
||||
for j := range dim[i] {
|
||||
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
|
||||
strconv.Itoa(i),
|
||||
strconv.Itoa(60000+j),
|
||||
)
|
||||
|
||||
var ni netmap.NodeInfo
|
||||
ni.SetNetworkEndpoints(a)
|
||||
ni.SetPublicKey([]byte(a))
|
||||
|
||||
var na network.AddressGroup
|
||||
|
||||
err := na.FromIterator(netmapcore.Node(ni))
|
||||
require.NoError(t, err)
|
||||
|
||||
as[j] = network.StringifyGroup(na)
|
||||
|
||||
ns[j] = ni
|
||||
}
|
||||
|
||||
mNodes[i] = ns
|
||||
mAddr[i] = as
|
||||
}
|
||||
|
||||
return mNodes, mAddr
|
||||
}
|
Loading…
Reference in a new issue