object: Fix Put
for EC
object when node unavailable #1427
2 changed files with 205 additions and 6 deletions
|
@ -197,14 +197,15 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partsProcessed := make([]atomic.Bool, len(parts))
|
||||
objID, _ := obj.ID()
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
nodes := t.Next()
|
||||
if len(nodes) == 0 {
|
||||
break
|
||||
|
@ -216,13 +217,20 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
}
|
||||
|
||||
for idx := range parts {
|
||||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
if !partsProcessed[idx].Load() {
|
||||
eg.Go(func() error {
|
||||
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
if err == nil {
|
||||
partsProcessed[idx].Store(true)
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
err = eg.Wait()
|
||||
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
if err != nil {
|
||||
return errIncompletePut{
|
||||
singleErr: err,
|
||||
}
|
||||
|
|
191
pkg/services/object/common/writer/ec_test.go
Normal file
191
pkg/services/object/common/writer/ec_test.go
Normal file
|
@ -0,0 +1,191 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testPlacementBuilder struct {
|
||||
vectors [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
|
||||
[][]netmap.NodeInfo, error,
|
||||
) {
|
||||
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
|
||||
copy(arr, p.vectors[0])
|
||||
return [][]netmap.NodeInfo{arr}, nil
|
||||
}
|
||||
|
||||
type nmKeys struct{}
|
||||
|
||||
func (nmKeys) IsLocalKey(_ []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type clientConstructor struct {
|
||||
vectors [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
|
||||
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
|
||||
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
|
||||
return multiAddressClient{err: errors.New("node unavailable")}, nil
|
||||
}
|
||||
return multiAddressClient{}, nil
|
||||
}
|
||||
|
||||
type multiAddressClient struct {
|
||||
client.MultiAddressClient
|
||||
err error
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
|
||||
if c.err != nil {
|
||||
return nil, c.err
|
||||
}
|
||||
return &apiclient.ResObjectPutSingle{}, nil
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ReportError(error) {
|
||||
}
|
||||
|
||||
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
|
||||
return nil
|
||||
fyrchik
commented
We can use interface embedding for this purpose -- it will panic with NPE, but we won't have to write boilerplate. We can use interface embedding for this purpose -- it will panic with NPE, but we won't have to write boilerplate.
This style is bad for real code, but good for tests IMO.
acid-ant
commented
Agree, updated. Looks much better. Agree, updated. Looks much better.
|
||||
}
|
||||
|
||||
func TestECWriter(t *testing.T) {
|
||||
// Create container with policy EC 1.1
|
||||
cnr := container.Container{}
|
||||
p1 := netmap.PlacementPolicy{}
|
||||
p1.SetContainerBackupFactor(1)
|
||||
x1 := netmap.ReplicaDescriptor{}
|
||||
x1.SetECDataCount(1)
|
||||
x1.SetECParityCount(1)
|
||||
p1.AddReplicas(x1)
|
||||
cnr.SetPlacementPolicy(p1)
|
||||
cnr.SetAttribute("cnr", "cnr1")
|
||||
|
||||
cid := cidtest.ID()
|
||||
|
||||
// Create 4 nodes, 2 nodes for chunks,
|
||||
// 2 nodes for the case when the first two will fail.
|
||||
ns, _ := testNodeMatrix(t, []int{4})
|
||||
|
||||
data := make([]byte, 100)
|
||||
_, _ = rand.Read(data)
|
||||
ver := version.Current()
|
||||
|
||||
var csum checksum.Checksum
|
||||
csum.SetSHA256(sha256.Sum256(data))
|
||||
|
||||
var csumTZ checksum.Checksum
|
||||
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
|
||||
|
||||
obj := objectSDK.New()
|
||||
obj.SetID(oidtest.ID())
|
||||
obj.SetOwnerID(usertest.ID())
|
||||
obj.SetContainerID(cid)
|
||||
obj.SetVersion(&ver)
|
||||
obj.SetPayload(data)
|
||||
obj.SetPayloadSize(uint64(len(data)))
|
||||
obj.SetPayloadChecksum(csum)
|
||||
obj.SetPayloadHomomorphicHash(csumTZ)
|
||||
|
||||
// Builder return nodes without sort by hrw
|
||||
builder := &testPlacementBuilder{
|
||||
vectors: ns,
|
||||
}
|
||||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
log, err := logger.NewLogger(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var n nmKeys
|
||||
ecw := ECWriter{
|
||||
Config: &Config{
|
||||
NetmapKeys: n,
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`version.Current()`?
acid-ant
commented
Fixed. Fixed.
|
||||
RemotePool: pool,
|
||||
Logger: log,
|
||||
ClientConstructor: clientConstructor{vectors: ns},
|
||||
},
|
||||
PlacementOpts: append(
|
||||
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||
placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
Container: cnr,
|
||||
Key: &ownerKey.PrivateKey,
|
||||
Relay: nil,
|
||||
ObjectMetaValid: true,
|
||||
}
|
||||
|
||||
err = ecw.WriteObject(context.Background(), obj)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
||||
mNodes := make([][]netmap.NodeInfo, len(dim))
|
||||
mAddr := make([][]string, len(dim))
|
||||
|
||||
for i := range dim {
|
||||
ns := make([]netmap.NodeInfo, dim[i])
|
||||
as := make([]string, dim[i])
|
||||
|
||||
for j := range dim[i] {
|
||||
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
|
||||
strconv.Itoa(i),
|
||||
strconv.Itoa(60000+j),
|
||||
)
|
||||
|
||||
var ni netmap.NodeInfo
|
||||
ni.SetNetworkEndpoints(a)
|
||||
ni.SetPublicKey([]byte(a))
|
||||
|
||||
var na network.AddressGroup
|
||||
|
||||
err := na.FromIterator(netmapcore.Node(ni))
|
||||
require.NoError(t, err)
|
||||
|
||||
as[j] = network.StringifyGroup(na)
|
||||
|
||||
ns[j] = ni
|
||||
}
|
||||
|
||||
mNodes[i] = ns
|
||||
mAddr[i] = as
|
||||
}
|
||||
|
||||
return mNodes, mAddr
|
||||
}
|
Loading…
Reference in a new issue
The linter hasn't caught this, but it seems the error value is unused, unless it is the last loop iteration.
This was not a problem previously, because waiting was done once, outside the loop.
It this intentional?
Yes, it is intentional. The idea is to iterate over all nodes and return only the last error. All other errors will be logged inside
e.writePart(...)
.