[#1129] policer: Restore EC object

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-05-16 12:26:49 +03:00
parent 44f2e8f27f
commit 436c9f5558
8 changed files with 483 additions and 57 deletions

View file

@ -29,7 +29,6 @@ import (
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2" getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2" putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
return err return err
} }
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor) remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
pol := policer.New( pol := policer.New(
policer.WithLogger(c.log), policer.WithLogger(c.log),
@ -243,8 +242,8 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
), ),
policer.WithRemoteObjectHeaderFunc( policer.WithRemoteObjectHeaderFunc(
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw) prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
return remoteHeader.Head(ctx, prm) return remoteReader.Head(ctx, prm)
}, },
), ),
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) { policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
@ -256,6 +255,19 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
} }
return res.Header(), nil return res.Header(), nil
}), }),
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteReader.Get(ctx, prm)
}),
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
var prm engine.GetPrm
prm.WithAddress(a)
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
if err != nil {
return nil, err
}
return res.Object(), nil
}),
policer.WithNetmapKeys(c), policer.WithNetmapKeys(c),
policer.WithHeadTimeout( policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg), policerconfig.HeadTimeout(c.appCfg),
@ -274,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
}), }),
policer.WithPool(c.cfgObject.pool.replication), policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()), policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
policer.WithKeyStorage(keyStorage),
) )
c.workers = append(c.workers, worker{ c.workers = append(c.workers, worker{

View file

@ -536,4 +536,9 @@ const (
PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk" PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk"
ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage" ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage"
ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage" ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage"
PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node"
PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks"
PolicerFailedToRestoreObject = "failed to restore EC object"
PolicerCouldNotGetChunk = "could not get EC chunk"
PolicerCouldNotGetChunks = "could not get EC chunks"
) )

View file

@ -1,4 +1,4 @@
package headsvc package object
import ( import (
"context" "context"
@ -18,16 +18,16 @@ type ClientConstructor interface {
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error) Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
} }
// RemoteHeader represents utility for getting // RemoteReader represents utility for getting
// the object header from a remote host. // the object from a remote host.
type RemoteHeader struct { type RemoteReader struct {
keyStorage *util.KeyStorage keyStorage *util.KeyStorage
clientCache ClientConstructor clientCache ClientConstructor
} }
// RemoteHeadPrm groups remote header operation parameters. // RemoteRequestPrm groups remote operation parameters.
type RemoteHeadPrm struct { type RemoteRequestPrm struct {
addr oid.Address addr oid.Address
raw bool raw bool
node netmap.NodeInfo node netmap.NodeInfo
@ -37,16 +37,16 @@ const remoteOpTTL = 1
var ErrNotFound = errors.New("object header not found") var ErrNotFound = errors.New("object header not found")
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance. // NewRemoteReader creates, initializes and returns new RemoteHeader instance.
func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader { func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader {
return &RemoteHeader{ return &RemoteReader{
keyStorage: keyStorage, keyStorage: keyStorage,
clientCache: cache, clientCache: cache,
} }
} }
// WithNodeInfo sets information about the remote node. // WithNodeInfo sets information about the remote node.
func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm { func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm {
if p != nil { if p != nil {
p.node = v p.node = v
} }
@ -55,7 +55,7 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
} }
// WithObjectAddress sets object address. // WithObjectAddress sets object address.
func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm { func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm {
if p != nil { if p != nil {
p.addr = v p.addr = v
} }
@ -63,7 +63,7 @@ func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm {
return p return p
} }
func (p *RemoteHeadPrm) WithRaw(v bool) *RemoteHeadPrm { func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm {
if p != nil { if p != nil {
p.raw = v p.raw = v
} }
@ -71,7 +71,7 @@ func (p *RemoteHeadPrm) WithRaw(v bool) *RemoteHeadPrm {
} }
// Head requests object header from the remote node. // Head requests object header from the remote node.
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) { func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
key, err := h.keyStorage.GetKey(nil) key, err := h.keyStorage.GetKey(nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err) return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
@ -106,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
return res.Header(), nil return res.Header(), nil
} }
func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
key, err := h.keyStorage.GetKey(nil)
if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
}
var info clientcore.NodeInfo
err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node))
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := h.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
}
var getPrm internalclient.GetObjectPrm
getPrm.SetClient(c)
getPrm.SetPrivateKey(key)
getPrm.SetAddress(prm.addr)
getPrm.SetTTL(remoteOpTTL)
if prm.raw {
getPrm.SetRawFlag()
}
res, err := internalclient.GetObject(ctx, getPrm)
if err != nil {
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
}
return res.Object(), nil
}

View file

@ -2,6 +2,7 @@ package policer
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -11,8 +12,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
var errNoECinfoReturnded = errors.New("no EC info returned") var errNoECinfoReturnded = errors.New("no EC info returned")
@ -84,7 +87,7 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object
// drop local chunk only if all required chunks are in place // drop local chunk only if all required chunks are in place
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0]) res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
} }
p.adjustECPlacement(ctx, objInfo, nn[0]) p.adjustECPlacement(ctx, objInfo, nn[0], policy)
if res.removeLocal { if res.removeLocal {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
@ -224,7 +227,7 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A
} }
if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID { if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID {
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed), p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed),
zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress)) zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
return false return false
} }
indexToObjectID[ch.Index] = chunkID indexToObjectID[ch.Index] = chunkID
@ -242,26 +245,146 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A
return true return true
} }
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) { func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
var parentAddress oid.Address var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID) parentAddress.SetObject(objInfo.ECInfo.ParentID)
var eiErr *objectSDK.ECInfoError var eiErr *objectSDK.ECInfoError
resolved := make(map[uint32][]netmap.NodeInfo)
chunkIDs := make(map[uint32]oid.ID)
restore := true // do not restore EC chunks if some node returned error
for idx, n := range nodes { for idx, n := range nodes {
if uint32(idx) == objInfo.ECInfo.Total { if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total {
return return
} }
var err error
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) { if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
continue _, err = p.localHeader(ctx, parentAddress)
} else {
_, err = p.remoteHeader(ctx, n, parentAddress, true)
} }
_, err := p.remoteHeader(ctx, n, parentAddress, true)
if errors.As(err, &eiErr) { if errors.As(err, &eiErr) {
continue for _, ch := range eiErr.ECInfo().Chunks {
resolved[ch.Index] = append(resolved[ch.Index], n)
var ecInfoChunkID oid.ID
if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil {
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
return
} }
if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID {
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID),
zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
return
}
chunkIDs[ch.Index] = ecInfoChunkID
}
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
p.replicator.HandleReplicationTask(ctx, replicator.Task{ p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1, NumCopies: 1,
Addr: objInfo.Address, Addr: objInfo.Address,
Nodes: []netmap.NodeInfo{n}, Nodes: []netmap.NodeInfo{n},
}, newNodeCache()) }, newNodeCache())
restore = false
}
}
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
return
}
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
var found []uint32
for i := range resolved {
found = append(found, i)
}
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
return
}
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
}
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs)
if parts == nil {
return
}
key, err := p.keyStorage.GetKey(nil)
if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
required := make([]bool, len(parts))
for i, p := range parts {
if p == nil {
required[i] = true
}
}
if err := c.ReconstructParts(parts, required, key); err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
for idx, part := range parts {
if _, exists := existedChunks[uint32(idx)]; exists {
continue
}
var addr oid.Address
addr.SetContainer(parentAddress.Container())
pID, _ := part.ID()
addr.SetObject(pID)
targetNode := nodes[idx%len(nodes)]
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
Addr: addr,
Obj: part,
})
} else {
p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1,
Addr: addr,
Nodes: []netmap.NodeInfo{targetNode},
Obj: part,
}, newNodeCache())
}
} }
} }
func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object {
parts := make([]*objectSDK.Object, objInfo.ECInfo.Total)
errGroup, egCtx := errgroup.WithContext(ctx)
for idx, nodes := range existedChunks {
idx := idx
nodes := nodes
errGroup.Go(func() error {
var objID oid.Address
objID.SetContainer(parentAddress.Container())
objID.SetObject(chunkIDs[idx])
var obj *objectSDK.Object
var err error
for _, node := range nodes {
if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
obj, err = p.localObject(egCtx, objID)
} else {
obj, err = p.remoteObject(egCtx, node, objID)
}
if err == nil {
break
}
p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey())))
}
if obj != nil {
parts[idx] = obj
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err))
return nil
}
return parts
}

View file

@ -3,6 +3,7 @@ package policer
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
@ -10,13 +11,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -52,7 +56,7 @@ func TestECChunkHasValidPlacement(t *testing.T) {
return nil, errors.New("unexpected placement build") return nil, errors.New("unexpected placement build")
} }
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
require.True(t, raw, "remote header for parent object must be called with raw flag") require.True(t, raw, "remote header for parent object must be called with raw flag")
index := int(ni.PublicKey()[0]) index := int(ni.PublicKey()[0])
require.True(t, index == 1 || index == 2, "invalid node to get parent header") require.True(t, index == 1 || index == 2, "invalid node to get parent header")
@ -66,13 +70,25 @@ func TestECChunkHasValidPlacement(t *testing.T) {
return nil, objectSDK.NewECInfoError(ei) return nil, objectSDK.NewECInfoError(ei)
} }
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
p := New( p := New(
WithContainerSource(containerSrc), WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)), WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey()) return bytes.Equal(k, nodes[0].PublicKey())
})), })),
WithRemoteObjectHeaderFunc(headFn), WithRemoteObjectHeaderFunc(remoteHeadFn),
WithLocalObjectHeaderFunc(localHeadFn),
WithPool(testPool(t)), WithPool(testPool(t)),
) )
@ -209,11 +225,13 @@ func TestECChunkHasInvalidPlacement(t *testing.T) {
})), })),
WithRemoteObjectHeaderFunc(headFn), WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF), WithLocalObjectHeaderFunc(localHeadF),
WithReplicator(pullFunc(func(ctx context.Context, r replicator.Task) { WithReplicator(&testReplicator{
handlePullTask: (func(ctx context.Context, r replicator.Task) {
require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID && require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID &&
len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task") len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task")
pullCounter.Add(1) pullCounter.Add(1)
})), }),
}),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
require.True(t, allowDrop, "invalid redundent copy call") require.True(t, allowDrop, "invalid redundent copy call")
dropped = append(dropped, a) dropped = append(dropped, a)
@ -367,9 +385,11 @@ func TestECChunkHasInvalidPlacement(t *testing.T) {
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
dropped = append(dropped, a) dropped = append(dropped, a)
}), }),
WithReplicator(replicatorFunc(func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) { WithReplicator(&testReplicator{
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
replicated = append(replicated, t) replicated = append(replicated, t)
})), },
}),
WithPool(testPool(t)), WithPool(testPool(t)),
) )
@ -391,12 +411,155 @@ func TestECChunkHasInvalidPlacement(t *testing.T) {
}) })
} }
type pullFunc func(context.Context, replicator.Task) func TestECChunkRestore(t *testing.T) {
// node0 has chunk0, node1 has chunk1
// policer should replicate chunk0 to node2 on the first run
// then restore EC object and replicate chunk2 to node2 on the second run
t.Parallel()
func (f pullFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { payload := make([]byte, 64)
panic("not implemented") rand.Read(payload)
} parentAddress := oidtest.Address()
parentObject := objectSDK.New()
parentObject.SetContainerID(parentAddress.Container())
parentObject.SetPayload(payload)
parentObject.SetPayloadSize(64)
objectSDK.CalculateAndSetPayloadChecksum(parentObject)
err := objectSDK.CalculateAndSetID(parentObject)
require.NoError(t, err)
id, _ := parentObject.ID()
parentAddress.SetObject(id)
func (f pullFunc) HandlePullTask(ctx context.Context, task replicator.Task) { chunkIDs := make([]oid.ID, 3)
f(ctx, task) c, err := erasurecode.NewConstructor(2, 1)
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
chunks, err := c.Split(parentObject, &key.PrivateKey)
require.NoError(t, err)
for i, ch := range chunks {
chunkIDs[i], _ = ch.ID()
}
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(parentAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
var secondRun bool
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
require.True(t, raw, "remote header for parent object must be called with raw flag")
index := int(ni.PublicKey()[0])
require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header")
require.True(t, a == parentAddress, "invalid address to get remote header")
if index == 1 {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[1])
ch.Index = uint32(1)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if index == 2 && secondRun {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[0])
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
return nil, new(apistatus.ObjectNotFound)
}
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a == parentAddress, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[0])
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var replicatedObj []*objectSDK.Object
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(remoteHeadFn),
WithLocalObjectHeaderFunc(localHeadFn),
WithReplicator(&testReplicator{
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
if t.Obj != nil {
replicatedObj = append(replicatedObj, t.Obj)
}
},
}),
WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request")
return chunks[0], nil
}),
WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
index := ni.PublicKey()[0]
if index == 2 {
return nil, new(apistatus.ObjectNotFound)
}
return chunks[index], nil
}),
WithPool(testPool(t)),
WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)),
)
var chunkAddress oid.Address
chunkAddress.SetContainer(parentAddress.Container())
chunkAddress.SetObject(chunkIDs[0])
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentAddress.Object(),
Index: 0,
Total: 3,
},
}
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
secondRun = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count")
chunks[2].SetSignature(nil)
expectedData, err := chunks[2].MarshalJSON()
require.NoError(t, err)
replicatedObj[0].SetSignature(nil)
actualData, err := replicatedObj[0].MarshalJSON()
require.NoError(t, err)
require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects")
} }

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -37,6 +38,7 @@ type BuryFunc func(context.Context, oid.Address) error
type Replicator interface { type Replicator interface {
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
HandlePullTask(ctx context.Context, task replicator.Task) HandlePullTask(ctx context.Context, task replicator.Task)
HandleLocalPutTask(ctx context.Context, task replicator.Task)
} }
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
@ -45,6 +47,10 @@ type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Addres
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node. // LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error) type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type cfg struct { type cfg struct {
headTimeout time.Duration headTimeout time.Duration
@ -75,6 +81,12 @@ type cfg struct {
evictDuration, sleepDuration time.Duration evictDuration, sleepDuration time.Duration
metrics MetricsRegister metrics MetricsRegister
remoteObject RemoteObjectGetFunc
localObject LocalObjectGetFunc
keyStorage *util.KeyStorage
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -145,6 +157,18 @@ func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
} }
} }
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
return func(c *cfg) {
c.remoteObject = v
}
}
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
return func(c *cfg) {
c.localObject = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys. // WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option { func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) { return func(c *cfg) {
@ -182,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option {
c.metrics = m c.metrics = m
} }
} }
func WithKeyStorage(ks *util.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = ks
}
}

View file

@ -249,12 +249,14 @@ func TestProcessObject(t *testing.T) {
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a) require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
gotRemoveRedundant = true gotRemoveRedundant = true
}), }),
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) { WithReplicator(&testReplicator{
handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task) require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
for _, node := range task.Nodes { for _, node := range task.Nodes {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
} }
})), },
}),
WithPool(testPool(t)), WithPool(testPool(t)),
) )
@ -440,13 +442,20 @@ type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// replicatorFunc is a Replicator backed by a function. type testReplicator struct {
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult)
handleLocalPutTask func(ctx context.Context, task replicator.Task)
func (f replicatorFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { handlePullTask func(ctx context.Context, task replicator.Task)
f(ctx, task, res)
} }
func (f replicatorFunc) HandlePullTask(ctx context.Context, task replicator.Task) { func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
panic("not implemented") r.handleReplicationTask(ctx, task, res)
}
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) {
r.handleLocalPutTask(ctx, task)
}
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) {
r.handlePullTask(ctx, task)
} }

View file

@ -0,0 +1,47 @@
package replicator
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errObjectNotDefined = errors.New("object is not defined")
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() {
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
}()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int("nodes_count", len(task.Nodes)),
))
defer span.End()
if task.Obj == nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(errObjectNotDefined),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
err := engine.Put(ctx, p.localStorage, task.Obj)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}