diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index af185a8da..9f34896bc 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -29,7 +29,6 @@ import ( deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2" - headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" @@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl return err } - remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor) + remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor) pol := policer.New( policer.WithLogger(c.log), @@ -243,8 +242,8 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl ), policer.WithRemoteObjectHeaderFunc( func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { - prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw) - return remoteHeader.Head(ctx, prm) + prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw) + return remoteReader.Head(ctx, prm) }, ), policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) { @@ -256,6 +255,19 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl } return res.Header(), nil }), + policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) { + prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a) + return remoteReader.Get(ctx, prm) + }), + policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) { + var prm engine.GetPrm + prm.WithAddress(a) + res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm) + if err != nil { + return nil, err + } + return res.Object(), nil + }), policer.WithNetmapKeys(c), policer.WithHeadTimeout( policerconfig.HeadTimeout(c.appCfg), @@ -274,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl }), policer.WithPool(c.cfgObject.pool.replication), policer.WithMetrics(c.metricsCollector.PolicerMetrics()), + policer.WithKeyStorage(keyStorage), ) c.workers = append(c.workers, worker{ diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 2e89e4304..43fd77624 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -536,4 +536,9 @@ const ( PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk" ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage" ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage" + PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node" + PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks" + PolicerFailedToRestoreObject = "failed to restore EC object" + PolicerCouldNotGetChunk = "could not get EC chunk" + PolicerCouldNotGetChunks = "could not get EC chunks" ) diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/remote_reader.go similarity index 55% rename from pkg/services/object/head/remote.go rename to pkg/services/object/remote_reader.go index 6d47e37eb..18b6107cf 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/remote_reader.go @@ -1,4 +1,4 @@ -package headsvc +package object import ( "context" @@ -18,16 +18,16 @@ type ClientConstructor interface { Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error) } -// RemoteHeader represents utility for getting -// the object header from a remote host. -type RemoteHeader struct { +// RemoteReader represents utility for getting +// the object from a remote host. +type RemoteReader struct { keyStorage *util.KeyStorage clientCache ClientConstructor } -// RemoteHeadPrm groups remote header operation parameters. -type RemoteHeadPrm struct { +// RemoteRequestPrm groups remote operation parameters. +type RemoteRequestPrm struct { addr oid.Address raw bool node netmap.NodeInfo @@ -37,16 +37,16 @@ const remoteOpTTL = 1 var ErrNotFound = errors.New("object header not found") -// NewRemoteHeader creates, initializes and returns new RemoteHeader instance. -func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader { - return &RemoteHeader{ +// NewRemoteReader creates, initializes and returns new RemoteHeader instance. +func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader { + return &RemoteReader{ keyStorage: keyStorage, clientCache: cache, } } // WithNodeInfo sets information about the remote node. -func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm { +func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm { if p != nil { p.node = v } @@ -55,7 +55,7 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm { } // WithObjectAddress sets object address. -func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm { +func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm { if p != nil { p.addr = v } @@ -63,7 +63,7 @@ func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm { return p } -func (p *RemoteHeadPrm) WithRaw(v bool) *RemoteHeadPrm { +func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm { if p != nil { p.raw = v } @@ -71,7 +71,7 @@ func (p *RemoteHeadPrm) WithRaw(v bool) *RemoteHeadPrm { } // Head requests object header from the remote node. -func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) { +func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) { key, err := h.keyStorage.GetKey(nil) if err != nil { return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err) @@ -106,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK return res.Header(), nil } + +func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) { + key, err := h.keyStorage.GetKey(nil) + if err != nil { + return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err) + } + + var info clientcore.NodeInfo + + err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node)) + if err != nil { + return nil, fmt.Errorf("parse client node info: %w", err) + } + + c, err := h.clientCache.Get(info) + if err != nil { + return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err) + } + + var getPrm internalclient.GetObjectPrm + + getPrm.SetClient(c) + getPrm.SetPrivateKey(key) + getPrm.SetAddress(prm.addr) + getPrm.SetTTL(remoteOpTTL) + if prm.raw { + getPrm.SetRawFlag() + } + + res, err := internalclient.GetObject(ctx, getPrm) + if err != nil { + return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err) + } + + return res.Object(), nil +} diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index fbd389d71..b15efdd15 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -2,6 +2,7 @@ package policer import ( "context" + "encoding/hex" "errors" "fmt" @@ -11,8 +12,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var errNoECinfoReturnded = errors.New("no EC info returned") @@ -84,7 +87,7 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object // drop local chunk only if all required chunks are in place res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0]) } - p.adjustECPlacement(ctx, objInfo, nn[0]) + p.adjustECPlacement(ctx, objInfo, nn[0], policy) if res.removeLocal { p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) @@ -224,7 +227,7 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A } if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID { p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed), - zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress)) + zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index)) return false } indexToObjectID[ch.Index] = chunkID @@ -242,26 +245,146 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A return true } -func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) { +func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) { var parentAddress oid.Address parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetObject(objInfo.ECInfo.ParentID) var eiErr *objectSDK.ECInfoError + resolved := make(map[uint32][]netmap.NodeInfo) + chunkIDs := make(map[uint32]oid.ID) + restore := true // do not restore EC chunks if some node returned error for idx, n := range nodes { - if uint32(idx) == objInfo.ECInfo.Total { + if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total { return } + var err error if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) { - continue + _, err = p.localHeader(ctx, parentAddress) + } else { + _, err = p.remoteHeader(ctx, n, parentAddress, true) } - _, err := p.remoteHeader(ctx, n, parentAddress, true) + if errors.As(err, &eiErr) { + for _, ch := range eiErr.ECInfo().Chunks { + resolved[ch.Index] = append(resolved[ch.Index], n) + var ecInfoChunkID oid.ID + if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil { + p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress)) + return + } + if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID { + p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID), + zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index)) + return + } + chunkIDs[ch.Index] = ecInfoChunkID + } + } else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total { + p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err)) + p.replicator.HandleReplicationTask(ctx, replicator.Task{ + NumCopies: 1, + Addr: objInfo.Address, + Nodes: []netmap.NodeInfo{n}, + }, newNodeCache()) + restore = false + } + } + if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total { + return + } + if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() { + var found []uint32 + for i := range resolved { + found = append(found, i) + } + p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found)) + return + } + p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy) +} + +func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) { + c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount())) + if err != nil { + p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err)) + return + } + parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs) + if parts == nil { + return + } + key, err := p.keyStorage.GetKey(nil) + if err != nil { + p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err)) + return + } + required := make([]bool, len(parts)) + for i, p := range parts { + if p == nil { + required[i] = true + } + } + if err := c.ReconstructParts(parts, required, key); err != nil { + p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err)) + return + } + for idx, part := range parts { + if _, exists := existedChunks[uint32(idx)]; exists { continue } - p.replicator.HandleReplicationTask(ctx, replicator.Task{ - NumCopies: 1, - Addr: objInfo.Address, - Nodes: []netmap.NodeInfo{n}, - }, newNodeCache()) + var addr oid.Address + addr.SetContainer(parentAddress.Container()) + pID, _ := part.ID() + addr.SetObject(pID) + targetNode := nodes[idx%len(nodes)] + if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) { + p.replicator.HandleLocalPutTask(ctx, replicator.Task{ + Addr: addr, + Obj: part, + }) + } else { + p.replicator.HandleReplicationTask(ctx, replicator.Task{ + NumCopies: 1, + Addr: addr, + Nodes: []netmap.NodeInfo{targetNode}, + Obj: part, + }, newNodeCache()) + } } } + +func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object { + parts := make([]*objectSDK.Object, objInfo.ECInfo.Total) + errGroup, egCtx := errgroup.WithContext(ctx) + for idx, nodes := range existedChunks { + idx := idx + nodes := nodes + errGroup.Go(func() error { + var objID oid.Address + objID.SetContainer(parentAddress.Container()) + objID.SetObject(chunkIDs[idx]) + var obj *objectSDK.Object + var err error + for _, node := range nodes { + if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { + obj, err = p.localObject(egCtx, objID) + } else { + obj, err = p.remoteObject(egCtx, node, objID) + } + if err == nil { + break + } + p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey()))) + } + if obj != nil { + parts[idx] = obj + } + return nil + }) + } + if err := errGroup.Wait(); err != nil { + p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err)) + return nil + } + return parts +} diff --git a/pkg/services/policer/ec_test.go b/pkg/services/policer/ec_test.go index 3957c112b..3ac9699f0 100644 --- a/pkg/services/policer/ec_test.go +++ b/pkg/services/policer/ec_test.go @@ -3,6 +3,7 @@ package policer import ( "bytes" "context" + "crypto/rand" "errors" "fmt" "sync/atomic" @@ -10,13 +11,16 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" ) @@ -52,7 +56,7 @@ func TestECChunkHasValidPlacement(t *testing.T) { return nil, errors.New("unexpected placement build") } - headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { require.True(t, raw, "remote header for parent object must be called with raw flag") index := int(ni.PublicKey()[0]) require.True(t, index == 1 || index == 2, "invalid node to get parent header") @@ -66,13 +70,25 @@ func TestECChunkHasValidPlacement(t *testing.T) { return nil, objectSDK.NewECInfoError(ei) } + localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) { + require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header") + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(oidtest.ID()) + ch.Index = uint32(0) + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + p := New( WithContainerSource(containerSrc), WithPlacementBuilder(placementBuilderFunc(placementBuilder)), WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { return bytes.Equal(k, nodes[0].PublicKey()) })), - WithRemoteObjectHeaderFunc(headFn), + WithRemoteObjectHeaderFunc(remoteHeadFn), + WithLocalObjectHeaderFunc(localHeadFn), WithPool(testPool(t)), ) @@ -209,11 +225,13 @@ func TestECChunkHasInvalidPlacement(t *testing.T) { })), WithRemoteObjectHeaderFunc(headFn), WithLocalObjectHeaderFunc(localHeadF), - WithReplicator(pullFunc(func(ctx context.Context, r replicator.Task) { - require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID && - len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task") - pullCounter.Add(1) - })), + WithReplicator(&testReplicator{ + handlePullTask: (func(ctx context.Context, r replicator.Task) { + require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID && + len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task") + pullCounter.Add(1) + }), + }), WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { require.True(t, allowDrop, "invalid redundent copy call") dropped = append(dropped, a) @@ -367,9 +385,11 @@ func TestECChunkHasInvalidPlacement(t *testing.T) { WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) { dropped = append(dropped, a) }), - WithReplicator(replicatorFunc(func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) { - replicated = append(replicated, t) - })), + WithReplicator(&testReplicator{ + handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) { + replicated = append(replicated, t) + }, + }), WithPool(testPool(t)), ) @@ -391,12 +411,155 @@ func TestECChunkHasInvalidPlacement(t *testing.T) { }) } -type pullFunc func(context.Context, replicator.Task) +func TestECChunkRestore(t *testing.T) { + // node0 has chunk0, node1 has chunk1 + // policer should replicate chunk0 to node2 on the first run + // then restore EC object and replicate chunk2 to node2 on the second run + t.Parallel() -func (f pullFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { - panic("not implemented") -} + payload := make([]byte, 64) + rand.Read(payload) + parentAddress := oidtest.Address() + parentObject := objectSDK.New() + parentObject.SetContainerID(parentAddress.Container()) + parentObject.SetPayload(payload) + parentObject.SetPayloadSize(64) + objectSDK.CalculateAndSetPayloadChecksum(parentObject) + err := objectSDK.CalculateAndSetID(parentObject) + require.NoError(t, err) + id, _ := parentObject.ID() + parentAddress.SetObject(id) -func (f pullFunc) HandlePullTask(ctx context.Context, task replicator.Task) { - f(ctx, task) + chunkIDs := make([]oid.ID, 3) + c, err := erasurecode.NewConstructor(2, 1) + require.NoError(t, err) + key, err := keys.NewPrivateKey() + require.NoError(t, err) + chunks, err := c.Split(parentObject, &key.PrivateKey) + require.NoError(t, err) + for i, ch := range chunks { + chunkIDs[i], _ = ch.ID() + } + + var policy netmapSDK.PlacementPolicy + require.NoError(t, policy.DecodeString("EC 2.1")) + + cnr := &container.Container{} + cnr.Value.Init() + cnr.Value.SetPlacementPolicy(policy) + containerSrc := containerSrc{ + get: func(id cid.ID) (*container.Container, error) { + if id.Equals(parentAddress.Container()) { + return cnr, nil + } + return nil, new(apistatus.ContainerNotFound) + }, + } + + nodes := make([]netmapSDK.NodeInfo, 4) + for i := range nodes { + nodes[i].SetPublicKey([]byte{byte(i)}) + } + + placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) { + if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) { + return [][]netmapSDK.NodeInfo{nodes}, nil + } + return nil, errors.New("unexpected placement build") + } + var secondRun bool + remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { + require.True(t, raw, "remote header for parent object must be called with raw flag") + index := int(ni.PublicKey()[0]) + require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header") + require.True(t, a == parentAddress, "invalid address to get remote header") + if index == 1 { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkIDs[1]) + ch.Index = uint32(1) + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + if index == 2 && secondRun { + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkIDs[0]) + ch.Index = uint32(0) + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + return nil, new(apistatus.ObjectNotFound) + } + + localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) { + require.True(t, a == parentAddress, "invalid address to get remote header") + ei := objectSDK.NewECInfo() + var ch objectSDK.ECChunk + ch.SetID(chunkIDs[0]) + ch.Index = uint32(0) + ch.Total = 3 + ei.AddChunk(ch) + return nil, objectSDK.NewECInfoError(ei) + } + + var replicatedObj []*objectSDK.Object + p := New( + WithContainerSource(containerSrc), + WithPlacementBuilder(placementBuilderFunc(placementBuilder)), + WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { + return bytes.Equal(k, nodes[0].PublicKey()) + })), + WithRemoteObjectHeaderFunc(remoteHeadFn), + WithLocalObjectHeaderFunc(localHeadFn), + WithReplicator(&testReplicator{ + handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) { + if t.Obj != nil { + replicatedObj = append(replicatedObj, t.Obj) + } + }, + }), + WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) { + require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request") + return chunks[0], nil + }), + WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) { + index := ni.PublicKey()[0] + if index == 2 { + return nil, new(apistatus.ObjectNotFound) + } + return chunks[index], nil + }), + WithPool(testPool(t)), + WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)), + ) + + var chunkAddress oid.Address + chunkAddress.SetContainer(parentAddress.Container()) + chunkAddress.SetObject(chunkIDs[0]) + objInfo := objectcore.Info{ + Address: chunkAddress, + Type: objectSDK.TypeRegular, + ECInfo: &objectcore.ECInfo{ + ParentID: parentAddress.Object(), + Index: 0, + Total: 3, + }, + } + err = p.processObject(context.Background(), objInfo) + require.NoError(t, err) + secondRun = true + err = p.processObject(context.Background(), objInfo) + require.NoError(t, err) + + require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count") + chunks[2].SetSignature(nil) + expectedData, err := chunks[2].MarshalJSON() + require.NoError(t, err) + replicatedObj[0].SetSignature(nil) + actualData, err := replicatedObj[0].MarshalJSON() + require.NoError(t, err) + require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects") } diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 9a646eea6..9dbfd8b9f 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -37,6 +38,7 @@ type BuryFunc func(context.Context, oid.Address) error type Replicator interface { HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) HandlePullTask(ctx context.Context, task replicator.Task) + HandleLocalPutTask(ctx context.Context, task replicator.Task) } // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. @@ -45,6 +47,10 @@ type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Addres // LocalObjectHeaderFunc is the function to obtain HEAD info from the current node. type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error) +type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) + +type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error) + type cfg struct { headTimeout time.Duration @@ -75,6 +81,12 @@ type cfg struct { evictDuration, sleepDuration time.Duration metrics MetricsRegister + + remoteObject RemoteObjectGetFunc + + localObject LocalObjectGetFunc + + keyStorage *util.KeyStorage } func defaultCfg() *cfg { @@ -145,6 +157,18 @@ func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option { } } +func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option { + return func(c *cfg) { + c.remoteObject = v + } +} + +func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option { + return func(c *cfg) { + c.localObject = v + } +} + // WithNetmapKeys returns option to set tool to work with announced public keys. func WithNetmapKeys(v netmap.AnnouncedKeys) Option { return func(c *cfg) { @@ -182,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option { c.metrics = m } } + +func WithKeyStorage(ks *util.KeyStorage) Option { + return func(c *cfg) { + c.keyStorage = ks + } +} diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 072f21a6f..e353ea428 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -249,12 +249,14 @@ func TestProcessObject(t *testing.T) { require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a) gotRemoveRedundant = true }), - WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) { - require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task) - for _, node := range task.Nodes { - gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) - } - })), + WithReplicator(&testReplicator{ + handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) { + require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task) + for _, node := range task.Nodes { + gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) + } + }, + }), WithPool(testPool(t)), ) @@ -440,13 +442,20 @@ type announcedKeysFunc func([]byte) bool func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } -// replicatorFunc is a Replicator backed by a function. -type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) - -func (f replicatorFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { - f(ctx, task, res) +type testReplicator struct { + handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult) + handleLocalPutTask func(ctx context.Context, task replicator.Task) + handlePullTask func(ctx context.Context, task replicator.Task) } -func (f replicatorFunc) HandlePullTask(ctx context.Context, task replicator.Task) { - panic("not implemented") +func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { + r.handleReplicationTask(ctx, task, res) +} + +func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) { + r.handleLocalPutTask(ctx, task) +} + +func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) { + r.handlePullTask(ctx, task) } diff --git a/pkg/services/replicator/put.go b/pkg/services/replicator/put.go new file mode 100644 index 000000000..c06ec3f65 --- /dev/null +++ b/pkg/services/replicator/put.go @@ -0,0 +1,47 @@ +package replicator + +import ( + "context" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +var errObjectNotDefined = errors.New("object is not defined") + +func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) { + p.metrics.IncInFlightRequest() + defer p.metrics.DecInFlightRequest() + defer func() { + p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull")) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask", + trace.WithAttributes( + attribute.Stringer("address", task.Addr), + attribute.Int("nodes_count", len(task.Nodes)), + )) + defer span.End() + + if task.Obj == nil { + p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, + zap.Stringer("object", task.Addr), + zap.Error(errObjectNotDefined), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return + } + + err := engine.Put(ctx, p.localStorage, task.Obj) + if err != nil { + p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, + zap.Stringer("object", task.Addr), + zap.Error(err), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } +}