From cbe9757490a44436683a406f7917222190ab7dd6 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 14 May 2024 14:43:21 +0300 Subject: [PATCH] [#1129] policer: Pull required EC chunks Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 16 +- internal/logs/logs.go | 8 +- pkg/services/object/get/remote_getter.go | 55 ++++ pkg/services/object/head/prm.go | 17 - pkg/services/object/head/remote.go | 18 +- pkg/services/policer/check.go | 2 +- pkg/services/policer/ec.go | 166 +++++++++- pkg/services/policer/ec_test.go | 402 +++++++++++++++++++++++ pkg/services/policer/option.go | 17 +- pkg/services/policer/policer_test.go | 31 +- pkg/services/replicator/process.go | 2 +- pkg/services/replicator/pull.go | 72 ++++ pkg/services/replicator/replicator.go | 9 + 13 files changed, 744 insertions(+), 71 deletions(-) create mode 100644 pkg/services/object/get/remote_getter.go delete mode 100644 pkg/services/object/head/prm.go create mode 100644 pkg/services/policer/ec_test.go create mode 100644 pkg/services/replicator/pull.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 833daf628..af185a8da 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -242,11 +242,20 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl placement.NewNetworkMapSourceBuilder(c.netMapSource), ), policer.WithRemoteObjectHeaderFunc( - func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) { - prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a) + func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw) return remoteHeader.Head(ctx, prm) }, ), + policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) { + var prm engine.HeadPrm + prm.WithAddress(a) + res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm) + if err != nil { + return nil, err + } + return res.Header(), nil + }), policer.WithNetmapKeys(c), policer.WithHeadTimeout( policerconfig.HeadTimeout(c.appCfg), @@ -297,6 +306,9 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa replicator.WithRemoteSender( putsvc.NewRemoteSender(keyStorage, cache), ), + replicator.WithRemoteGetter( + getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage), + ), replicator.WithMetrics(c.metricsCollector.Replicator()), ) } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index f51c72e73..2e89e4304 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -529,5 +529,11 @@ const ( EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available" ECFailedToSendToContainerNode = "failed to send EC object to container node" ECFailedToSaveECPart = "failed to save EC part" - PolicerNodeIsNotContainerNodeForECObject = "current node is not container node for EC object" + PolicerNodeIsNotECObjectNode = "current node is not EC object node" + PolicerFailedToGetLocalECChunks = "failed to get local EC chunks" + PolicerMissingECChunk = "failed to find EC chunk on any of the nodes" + PolicerFailedToDecodeECChunkID = "failed to decode EC chunk ID" + PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk" + ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage" + ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage" ) diff --git a/pkg/services/object/get/remote_getter.go b/pkg/services/object/get/remote_getter.go new file mode 100644 index 000000000..0df67dec9 --- /dev/null +++ b/pkg/services/object/get/remote_getter.go @@ -0,0 +1,55 @@ +package getsvc + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type RemoteGetPrm struct { + Address oid.Address + Node netmapSDK.NodeInfo +} + +type RemoteGetter struct { + s remoteStorageConstructor + es epochSource + ks keyStorage +} + +func (g *RemoteGetter) Get(ctx context.Context, prm RemoteGetPrm) (*objectSDK.Object, error) { + var nodeInfo client.NodeInfo + if err := client.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(prm.Node)); err != nil { + return nil, err + } + rs, err := g.s.Get(nodeInfo) + if err != nil { + return nil, err + } + epoch, err := g.es.Epoch() + if err != nil { + return nil, err + } + key, err := g.ks.GetKey(nil) + if err != nil { + return nil, err + } + r := RemoteRequestParams{ + Epoch: epoch, + TTL: 1, + PrivateKey: key, + } + return rs.Get(ctx, prm.Address, r) +} + +func NewRemoteGetter(cc clientConstructor, es epochSource, ks keyStorage) *RemoteGetter { + return &RemoteGetter{ + s: &multiclientRemoteStorageConstructor{clientConstructor: cc}, + es: es, + ks: ks, + } +} diff --git a/pkg/services/object/head/prm.go b/pkg/services/object/head/prm.go deleted file mode 100644 index 5566e48fe..000000000 --- a/pkg/services/object/head/prm.go +++ /dev/null @@ -1,17 +0,0 @@ -package headsvc - -import ( - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -type Prm struct { - addr oid.Address -} - -func (p *Prm) WithAddress(v oid.Address) *Prm { - if p != nil { - p.addr = v - } - - return p -} diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index c9c17d4d8..6d47e37eb 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -28,8 +28,8 @@ type RemoteHeader struct { // RemoteHeadPrm groups remote header operation parameters. type RemoteHeadPrm struct { - commonHeadPrm *Prm - + addr oid.Address + raw bool node netmap.NodeInfo } @@ -57,12 +57,19 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm { // WithObjectAddress sets object address. func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm { if p != nil { - p.commonHeadPrm = new(Prm).WithAddress(v) + p.addr = v } return p } +func (p *RemoteHeadPrm) WithRaw(v bool) *RemoteHeadPrm { + if p != nil { + p.raw = v + } + return p +} + // Head requests object header from the remote node. func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) { key, err := h.keyStorage.GetKey(nil) @@ -86,8 +93,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK headPrm.SetClient(c) headPrm.SetPrivateKey(key) - headPrm.SetAddress(prm.commonHeadPrm.addr) + headPrm.SetAddress(prm.addr) headPrm.SetTTL(remoteOpTTL) + if prm.raw { + headPrm.SetRawFlag() + } res, err := internalclient.HeadObject(ctx, headPrm) if err != nil { diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 2153275cc..2cdb72505 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -134,7 +134,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) - _, err := p.remoteHeader(callCtx, nodes[i], addr) + _, err := p.remoteHeader(callCtx, nodes[i], addr, false) cancel() diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index 194b0ddbe..6eca3bc00 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -10,9 +10,18 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) +var errNoECinfoReturnded = errors.New("no EC info returned") + +type ecChunkProcessResult struct { + validPlacement bool + removeLocal bool +} + var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector") func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { @@ -72,30 +81,38 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object default: } - validPlacement := p.processECChunk(ctx, objInfo, nn[0]) - if !validPlacement { - p.pullRequiredECChunks(ctx, objInfo, nn[0]) + res := p.processECChunk(ctx, objInfo, nn[0]) + if !res.validPlacement { + // drop local chunk only if all required chunks are in place + res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0]) + } + p.adjustECPlacement(ctx, objInfo, nn[0]) + + if res.removeLocal { + p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) + p.cbRedundantCopy(ctx, objInfo.Address) } return nil } // processECChunk replicates EC chunk if needed. -// Returns True if current chunk should be stored on current node. -func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool { +func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) ecChunkProcessResult { var removeLocalChunk bool requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))] if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) { // current node is required node, we are happy - return true + return ecChunkProcessResult{ + validPlacement: true, + } } if requiredNode.IsMaintenance() { // consider maintenance mode has object, but do not drop local copy p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode))) - return false + return ecChunkProcessResult{} } callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) - _, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address) + _, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address, false) cancel() if err == nil { @@ -115,23 +132,138 @@ func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, n p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error())) } - if removeLocalChunk { - p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) - p.cbRedundantCopy(ctx, objInfo.Address) + return ecChunkProcessResult{ + removeLocal: removeLocalChunk, + } +} + +func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool { + var parentAddress oid.Address + parentAddress.SetContainer(objInfo.Address.Container()) + parentAddress.SetObject(objInfo.ECInfo.ParentID) + + requiredChunkIndexes := p.collectRequiredECChunks(nodes, objInfo) + if len(requiredChunkIndexes) == 0 { + p.log.Info(logs.PolicerNodeIsNotECObjectNode, zap.Stringer("object", objInfo.ECInfo.ParentID)) + return true } + err := p.resolveLocalECChunks(ctx, parentAddress, requiredChunkIndexes) + if err != nil { + p.log.Error(logs.PolicerFailedToGetLocalECChunks, zap.Error(err), zap.Stringer("object", parentAddress)) + return false + } + if len(requiredChunkIndexes) == 0 { + return true + } + + indexToObjectID := make(map[uint32]oid.ID) + success := p.resolveRemoteECChunks(ctx, parentAddress, nodes, requiredChunkIndexes, indexToObjectID) + if !success { + return false + } + + for index, candidates := range requiredChunkIndexes { + var addr oid.Address + addr.SetContainer(objInfo.Address.Container()) + addr.SetObject(indexToObjectID[index]) + p.replicator.HandlePullTask(ctx, replicator.Task{ + Addr: addr, + Nodes: candidates, + }) + } + // there was some missing chunks, it's not ok return false } -func (p *Policer) pullRequiredECChunks(_ context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) { - requiredChunkIndexes := make(map[uint32]struct{}) +func (p *Policer) collectRequiredECChunks(nodes []netmap.NodeInfo, objInfo objectcore.Info) map[uint32][]netmap.NodeInfo { + requiredChunkIndexes := make(map[uint32][]netmap.NodeInfo) for i, n := range nodes { + if uint32(i) == objInfo.ECInfo.Total { + break + } if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) { - requiredChunkIndexes[uint32(i)] = struct{}{} + requiredChunkIndexes[uint32(i)] = []netmap.NodeInfo{} } } - if len(requiredChunkIndexes) == 0 { - p.log.Info(logs.PolicerNodeIsNotContainerNodeForECObject, zap.Stringer("object", objInfo.ECInfo.ParentID)) - return + return requiredChunkIndexes +} + +func (p *Policer) resolveLocalECChunks(ctx context.Context, parentAddress oid.Address, required map[uint32][]netmap.NodeInfo) error { + _, err := p.localHeader(ctx, parentAddress) + var eiErr *objectSDK.ECInfoError + if err == nil { // should not be happen + return errNoECinfoReturnded + } + if !errors.As(err, &eiErr) { + return err + } + for _, ch := range eiErr.ECInfo().Chunks { + delete(required, ch.Index) + } + return nil +} + +func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.Address, nodes []netmap.NodeInfo, required map[uint32][]netmap.NodeInfo, indexToObjectID map[uint32]oid.ID) bool { + var eiErr *objectSDK.ECInfoError + for _, n := range nodes { + if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) { + continue + } + _, err := p.remoteHeader(ctx, n, parentAddress, true) + if !errors.As(err, &eiErr) { + continue + } + for _, ch := range eiErr.ECInfo().Chunks { + if candidates, ok := required[ch.Index]; ok { + candidates = append(candidates, n) + required[ch.Index] = candidates + + var chunkID oid.ID + if err := chunkID.ReadFromV2(ch.ID); err != nil { + p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress)) + return false + } + if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID { + p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed), + zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress)) + return false + } + indexToObjectID[ch.Index] = chunkID + } + } + } + + for index, candidates := range required { + if len(candidates) == 0 { + p.log.Error(logs.PolicerMissingECChunk, zap.Stringer("object", parentAddress), zap.Uint32("index", index)) + return false + } + } + + return true +} + +func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) { + var parentAddress oid.Address + parentAddress.SetContainer(objInfo.Address.Container()) + parentAddress.SetObject(objInfo.ECInfo.ParentID) + var eiErr *objectSDK.ECInfoError + for idx, n := range nodes { + if uint32(idx) == objInfo.ECInfo.Total { + return + } + if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) { + continue + } + _, err := p.remoteHeader(ctx, n, parentAddress, true) + if errors.As(err, &eiErr) { + continue + } + p.replicator.HandleReplicationTask(ctx, replicator.Task{ + NumCopies: 1, + Addr: objInfo.Address, + Nodes: []netmap.NodeInfo{n}, + }, newNodeCache()) } } diff --git a/pkg/services/policer/ec_test.go b/pkg/services/policer/ec_test.go new file mode 100644 index 000000000..3957c112b --- /dev/null +++ b/pkg/services/policer/ec_test.go @@ -0,0 +1,402 @@ +package policer + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync/atomic" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestECChunkHasValidPlacement(t *testing.T) { + t.Parallel() + chunkAddress := oidtest.Address() + parentID := oidtest.ID() + + var policy netmapSDK.PlacementPolicy + require.NoError(t, policy.DecodeString("EC 2.1")) + + cnr := &container.Container{} + cnr.Value.Init() + cnr.Value.SetPlacementPolicy(policy) + containerSrc := containerSrc{ + get: func(id cid.ID) (*container.Container, error) { + if id.Equals(chunkAddress.Container()) { + return cnr, nil + } + return nil, new(apistatus.ContainerNotFound) + }, + } + + nodes := make([]netmapSDK.NodeInfo, 4) + for i := range nodes { + nodes[i].SetPublicKey([]byte{byte(i)}) + } + + placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) { + if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) { + return [][]netmapSDK.NodeInfo{nodes}, nil + } + return nil, errors.New("unexpected placement build") + } + + headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + require.True(t, raw, "remote header for parent object must be called with raw flag") + index := int(ni.PublicKey()[0]) + require.True(t, index == 1 || index == 2, "invalid node to get parent header") + require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header") + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = uint32(index) + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + + p := New( + WithContainerSource(containerSrc), + WithPlacementBuilder(placementBuilderFunc(placementBuilder)), + WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { + return bytes.Equal(k, nodes[0].PublicKey()) + })), + WithRemoteObjectHeaderFunc(headFn), + WithPool(testPool(t)), + ) + + objInfo := objectcore.Info{ + Address: chunkAddress, + Type: objectSDK.TypeRegular, + ECInfo: &objectcore.ECInfo{ + ParentID: parentID, + Index: 0, + Total: 3, + }, + } + err := p.processObject(context.Background(), objInfo) + require.NoError(t, err) +} + +func TestECChunkHasInvalidPlacement(t *testing.T) { + t.Parallel() + chunkAddress := oidtest.Address() + parentID := oidtest.ID() + chunkObject := objectSDK.New() + chunkObject.SetContainerID(chunkAddress.Container()) + chunkObject.SetID(chunkAddress.Object()) + chunkObject.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + chunkObject.SetPayloadSize(uint64(10)) + chunkObject.SetECHeader(objectSDK.NewECHeader(parentID, 1, 3, []byte{}, 0)) + + var policy netmapSDK.PlacementPolicy + require.NoError(t, policy.DecodeString("EC 2.1")) + + cnr := &container.Container{} + cnr.Value.Init() + cnr.Value.SetPlacementPolicy(policy) + containerSrc := containerSrc{ + get: func(id cid.ID) (*container.Container, error) { + if id.Equals(chunkAddress.Container()) { + return cnr, nil + } + return nil, new(apistatus.ContainerNotFound) + }, + } + + nodes := make([]netmapSDK.NodeInfo, 4) + for i := range nodes { + nodes[i].SetPublicKey([]byte{byte(i)}) + } + + placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) { + if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) { + return [][]netmapSDK.NodeInfo{nodes}, nil + } + return nil, errors.New("unexpected placement build") + } + + objInfo := objectcore.Info{ + Address: chunkAddress, + Type: objectSDK.TypeRegular, + ECInfo: &objectcore.ECInfo{ + ParentID: parentID, + Index: 1, + Total: 3, + }, + } + + t.Run("node0 has chunk1, node1 has chunk0 and chunk1", func(t *testing.T) { + // policer should pull chunk0 on first run and drop chunk1 on second run + var allowDrop bool + requiredChunkID := oidtest.ID() + headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw { + return chunkObject, nil + } + if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + ch.Index = 0 + ch.SetID(requiredChunkID) + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = 2 + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + return nil, new(apistatus.ObjectNotFound) + } + require.Fail(t, "unexpected remote HEAD") + return nil, fmt.Errorf("unexpected remote HEAD") + } + + localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) { + require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD") + if allowDrop { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + ch.SetID(requiredChunkID) + ch.Index = 0 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + + var pullCounter atomic.Int64 + var dropped []oid.Address + p := New( + WithContainerSource(containerSrc), + WithPlacementBuilder(placementBuilderFunc(placementBuilder)), + WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { + return bytes.Equal(k, nodes[0].PublicKey()) + })), + WithRemoteObjectHeaderFunc(headFn), + WithLocalObjectHeaderFunc(localHeadF), + WithReplicator(pullFunc(func(ctx context.Context, r replicator.Task) { + require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID && + len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task") + pullCounter.Add(1) + })), + WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { + require.True(t, allowDrop, "invalid redundent copy call") + dropped = append(dropped, a) + }), + WithPool(testPool(t)), + ) + + err := p.processObject(context.Background(), objInfo) + require.NoError(t, err) + require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count") + require.Equal(t, 0, len(dropped), "invalid dropped count") + allowDrop = true + err = p.processObject(context.Background(), objInfo) + require.NoError(t, err) + require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count") + require.Equal(t, 1, len(dropped), "invalid dropped count") + require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object") + }) + + t.Run("node0 has chunk0 and chunk1, node1 has chunk1", func(t *testing.T) { + // policer should drop chunk1 + headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw { + return chunkObject, nil + } + if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkAddress.Object()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = 2 + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + return nil, new(apistatus.ObjectNotFound) + } + require.Fail(t, "unexpected remote HEAD") + return nil, fmt.Errorf("unexpected remote HEAD") + } + + localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) { + require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD") + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkAddress.Object()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + ch.SetID(oidtest.ID()) + ch.Index = 0 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + + var dropped []oid.Address + p := New( + WithContainerSource(containerSrc), + WithPlacementBuilder(placementBuilderFunc(placementBuilder)), + WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { + return bytes.Equal(k, nodes[0].PublicKey()) + })), + WithRemoteObjectHeaderFunc(headFn), + WithLocalObjectHeaderFunc(localHeadF), + WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { + dropped = append(dropped, a) + }), + WithPool(testPool(t)), + ) + + err := p.processObject(context.Background(), objInfo) + require.NoError(t, err) + require.Equal(t, 1, len(dropped), "invalid dropped count") + require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object") + }) + + t.Run("node0 has chunk0 and chunk1, node1 has no chunks", func(t *testing.T) { + // policer should replicate chunk1 to node1 on first run and drop chunk1 on node0 on second run + var secondRun bool + headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw { + if !secondRun { + return nil, new(apistatus.ObjectNotFound) + } + return chunkObject, nil + } + if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkAddress.Object()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = 2 + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() && + a.Object() == parentID && raw { + return nil, new(apistatus.ObjectNotFound) + } + require.Fail(t, "unexpected remote HEAD") + return nil, fmt.Errorf("unexpected remote HEAD") + } + + localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) { + require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD") + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkAddress.Object()) + ch.Index = 1 + ch.Total = 3 + ei.AddChunk(ch) + ch.SetID(oidtest.ID()) + ch.Index = 0 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + + var dropped []oid.Address + var replicated []replicator.Task + p := New( + WithContainerSource(containerSrc), + WithPlacementBuilder(placementBuilderFunc(placementBuilder)), + WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { + return bytes.Equal(k, nodes[0].PublicKey()) + })), + WithRemoteObjectHeaderFunc(headFn), + WithLocalObjectHeaderFunc(localHeadF), + WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { + dropped = append(dropped, a) + }), + WithReplicator(replicatorFunc(func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) { + replicated = append(replicated, t) + })), + WithPool(testPool(t)), + ) + + err := p.processObject(context.Background(), objInfo) + require.NoError(t, err) + require.Equal(t, 0, len(dropped), "invalid dropped count") + require.Equal(t, 1, len(replicated), "invalid replicated count") + require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object") + require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target") + + secondRun = true + err = p.processObject(context.Background(), objInfo) + require.NoError(t, err) + require.Equal(t, 1, len(replicated), "invalid replicated count") + require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object") + require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target") + require.Equal(t, 1, len(dropped), "invalid dropped count") + require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object") + }) +} + +type pullFunc func(context.Context, replicator.Task) + +func (f pullFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { + panic("not implemented") +} + +func (f pullFunc) HandlePullTask(ctx context.Context, task replicator.Task) { + f(ctx, task) +} diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 849e5ed8b..9a646eea6 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -36,10 +36,14 @@ type BuryFunc func(context.Context, oid.Address) error // Replicator is the interface to a consumer of replication tasks. type Replicator interface { HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) + HandlePullTask(ctx context.Context, task replicator.Task) } // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. -type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) +type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error) + +// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node. +type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error) type cfg struct { headTimeout time.Duration @@ -56,6 +60,8 @@ type cfg struct { remoteHeader RemoteObjectHeaderFunc + localHeader LocalObjectHeaderFunc + netmapKeys netmap.AnnouncedKeys replicator Replicator @@ -125,13 +131,20 @@ func WithPlacementBuilder(v placement.Builder) Option { } } -// WithRemoteObjectHeader returns option to set object header receiver of Policer. +// WithRemoteObjectHeader returns option to set remote object header receiver of Policer. func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option { return func(c *cfg) { c.remoteHeader = v } } +// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer. +func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option { + return func(c *cfg) { + c.localHeader = v + } +} + // WithNetmapKeys returns option to set tool to work with announced public keys. func WithNetmapKeys(v netmap.AnnouncedKeys) Option { return func(c *cfg) { diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 2c70b1c04..072f21a6f 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -161,31 +161,6 @@ func TestProcessObject(t *testing.T) { placement: [][]int{{0, 1, 2}}, wantReplicateTo: []int{1, 2}, }, - { - desc: "EC chunk stored valid on current node", - objType: objectSDK.TypeRegular, - nodeCount: 2, - policy: `EC 1.1`, - placement: [][]int{{0}}, - ecInfo: &objectcore.ECInfo{ - ParentID: oidtest.ID(), - Index: 1, - Total: 2, - }, - }, - { - desc: "EC chunk must be replicated to other EC node", - objType: objectSDK.TypeRegular, - nodeCount: 2, - policy: `EC 1.1`, - placement: [][]int{{1}}, - wantReplicateTo: []int{1}, - ecInfo: &objectcore.ECInfo{ - ParentID: oidtest.ID(), - Index: 1, - Total: 2, - }, - }, } for i := range tests { @@ -223,7 +198,7 @@ func TestProcessObject(t *testing.T) { } // Object remote header - headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) { + headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { index := int(ni.PublicKey()[0]) if a != addr || index < 1 || index >= ti.nodeCount { t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a) @@ -471,3 +446,7 @@ type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult func (f replicatorFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { f(ctx, task, res) } + +func (f replicatorFunc) HandlePullTask(ctx context.Context, task replicator.Task) { + panic("not implemented") +} diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 06d41b74e..3d04b7084 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -32,7 +32,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T ) }() - ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask", + ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleReplicateTask", trace.WithAttributes( attribute.Stringer("address", task.Addr), attribute.Int64("number_of_copies", int64(task.NumCopies)), diff --git a/pkg/services/replicator/pull.go b/pkg/services/replicator/pull.go new file mode 100644 index 000000000..d178700f6 --- /dev/null +++ b/pkg/services/replicator/pull.go @@ -0,0 +1,72 @@ +package replicator + +import ( + "context" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" + tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node") + +func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { + p.metrics.IncInFlightRequest() + defer p.metrics.DecInFlightRequest() + defer func() { + p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull")) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask", + trace.WithAttributes( + attribute.Stringer("address", task.Addr), + attribute.Int("nodes_count", len(task.Nodes)), + )) + defer span.End() + + var obj *objectSDK.Object + + for _, node := range task.Nodes { + var err error + obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{ + Address: task.Addr, + Node: node, + }) + if err == nil { + break + } + var endpoints []string + node.IterateNetworkEndpoints(func(s string) bool { + endpoints = append(endpoints, s) + return false + }) + p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage, + zap.Stringer("object", task.Addr), + zap.Error(err), + zap.Strings("endpoints", endpoints), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + + if obj == nil { + p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage, + zap.Stringer("object", task.Addr), + zap.Error(errFailedToGetObjectFromAnyNode), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return + } + + err := engine.Put(ctx, p.localStorage, obj) + if err != nil { + p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, + zap.Stringer("object", task.Addr), + zap.Error(err), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } +} diff --git a/pkg/services/replicator/replicator.go b/pkg/services/replicator/replicator.go index bb817cb32..a67f2e766 100644 --- a/pkg/services/replicator/replicator.go +++ b/pkg/services/replicator/replicator.go @@ -4,6 +4,7 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" @@ -25,6 +26,8 @@ type cfg struct { remoteSender *putsvc.RemoteSender + remoteGetter *getsvc.RemoteGetter + localStorage *engine.StorageEngine metrics MetricsRegister @@ -70,6 +73,12 @@ func WithRemoteSender(v *putsvc.RemoteSender) Option { } } +func WithRemoteGetter(v *getsvc.RemoteGetter) Option { + return func(c *cfg) { + c.remoteGetter = v + } +} + // WithLocalStorage returns option to set local object storage of Replicator. func WithLocalStorage(v *engine.StorageEngine) Option { return func(c *cfg) {