From 1b520f79733e3628af5d47b597b5baff60f3f36a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 1 Oct 2024 15:27:06 +0300 Subject: [PATCH] [#1412] engine: Add `IsIndexedContainer` flag Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 4 +- pkg/core/container/info.go | 103 ++++++++++++++++++ pkg/core/container/util.go | 12 ++ .../engine/control_test.go | 2 +- .../engine/delete_test.go | 8 +- .../engine/engine_test.go | 2 +- .../engine/inhume_test.go | 2 +- pkg/local_object_storage/engine/lock_test.go | 14 +-- pkg/local_object_storage/engine/put.go | 7 +- pkg/local_object_storage/engine/tree_test.go | 2 +- pkg/services/object/common/writer/ec.go | 3 +- pkg/services/object/common/writer/local.go | 9 +- pkg/services/object/common/writer/writer.go | 3 +- pkg/services/object/put/single.go | 11 +- pkg/services/policer/check.go | 2 +- pkg/services/policer/ec.go | 39 ++++--- pkg/services/replicator/pull.go | 3 +- pkg/services/replicator/put.go | 3 +- pkg/services/replicator/task.go | 3 + 19 files changed, 182 insertions(+), 50 deletions(-) create mode 100644 pkg/core/container/info.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 47649c88..5c322886 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -535,6 +535,6 @@ func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock) } -func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object) error { - return engine.Put(ctx, e.engine, o) +func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexableContainer bool) error { + return engine.Put(ctx, e.engine, o, indexableContainer) } diff --git a/pkg/core/container/info.go b/pkg/core/container/info.go new file mode 100644 index 00000000..62cc2155 --- /dev/null +++ b/pkg/core/container/info.go @@ -0,0 +1,103 @@ +package container + +import ( + "sync" + + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" +) + +type Info struct { + Indexed bool + Removed bool +} + +type infoValue struct { + info Info + err error +} + +type InfoProvider interface { + Info(id cid.ID) (Info, error) +} + +type infoProvider struct { + mtx *sync.RWMutex + cache map[cid.ID]infoValue + kl *utilSync.KeyLocker[cid.ID] + + source Source + sourceErr error + sourceOnce *sync.Once + sourceFactory func() (Source, error) +} + +func NewInfoProvider(sourceFactory func() (Source, error)) InfoProvider { + return &infoProvider{ + mtx: &sync.RWMutex{}, + cache: make(map[cid.ID]infoValue), + sourceOnce: &sync.Once{}, + kl: utilSync.NewKeyLocker[cid.ID](), + sourceFactory: sourceFactory, + } +} + +func (r *infoProvider) Info(id cid.ID) (Info, error) { + v, found := r.tryGetFromCache(id) + if found { + return v.info, v.err + } + + return r.getFromSource(id) +} + +func (r *infoProvider) tryGetFromCache(id cid.ID) (infoValue, bool) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + value, found := r.cache[id] + return value, found +} + +func (r *infoProvider) getFromSource(id cid.ID) (Info, error) { + r.kl.Lock(id) + defer r.kl.Unlock(id) + + if v, ok := r.tryGetFromCache(id); ok { + return v.info, v.err + } + + r.sourceOnce.Do(func() { + r.source, r.sourceErr = r.sourceFactory() + }) + if r.sourceErr != nil { + return Info{}, r.sourceErr + } + + cnr, err := r.source.Get(id) + var civ infoValue + if err != nil { + if client.IsErrContainerNotFound(err) { + removed, err := WasRemoved(r.source, id) + if err != nil { + civ.err = err + } else { + civ.info.Removed = removed + } + } else { + civ.err = err + } + } else { + civ.info.Indexed = IsIndexedContainer(cnr.Value) + } + r.putToCache(id, civ) + return civ.info, civ.err +} + +func (r *infoProvider) putToCache(id cid.ID, ct infoValue) { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.cache[id] = ct +} diff --git a/pkg/core/container/util.go b/pkg/core/container/util.go index 98919284..d2755680 100644 --- a/pkg/core/container/util.go +++ b/pkg/core/container/util.go @@ -4,6 +4,7 @@ import ( "errors" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) @@ -20,3 +21,14 @@ func WasRemoved(s Source, cid cid.ID) (bool, error) { } return false, err } + +// IsIndexedContainer returns True if container attributes should be indexed. +func IsIndexedContainer(cnr containerSDK.Container) bool { + var isS3Container bool + cnr.IterateAttributes(func(key, _ string) { + if key == ".s3-location-constraint" { + isS3Container = true + } + }) + return !isS3Container +} diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 2de92ae8..83babeca 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -171,7 +171,7 @@ func TestExecBlocks(t *testing.T) { addr := object.AddressOf(obj) - require.NoError(t, Put(context.Background(), e, obj)) + require.NoError(t, Put(context.Background(), e, obj, false)) // block executions errBlock := errors.New("block exec err") diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 4a675801..0904c982 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -58,9 +58,9 @@ func TestDeleteBigObject(t *testing.T) { defer e.Close(context.Background()) for i := range children { - require.NoError(t, Put(context.Background(), e, children[i])) + require.NoError(t, Put(context.Background(), e, children[i], false)) } - require.NoError(t, Put(context.Background(), e, link)) + require.NoError(t, Put(context.Background(), e, link, false)) addrParent := object.AddressOf(parent) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) @@ -126,9 +126,9 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) { defer e.Close(context.Background()) for i := range children { - require.NoError(t, Put(context.Background(), e, children[i])) + require.NoError(t, Put(context.Background(), e, children[i], false)) } - require.NoError(t, Put(context.Background(), e, link)) + require.NoError(t, Put(context.Background(), e, link, false)) addrParent := object.AddressOf(parent) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 525e17f3..88c523b7 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -54,7 +54,7 @@ func benchmarkExists(b *testing.B, shardNum int) { addr := oidtest.Address() for range 100 { obj := testutil.GenerateObjectWithCID(cidtest.ID()) - err := Put(context.Background(), e, obj) + err := Put(context.Background(), e, obj, false) if err != nil { b.Fatal(err) } diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 9daa113f..f8767925 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -40,7 +40,7 @@ func TestStorageEngine_Inhume(t *testing.T) { e := testNewEngine(t).setShardsNum(t, 1).engine defer e.Close(context.Background()) - err := Put(context.Background(), e, parent) + err := Put(context.Background(), e, parent, false) require.NoError(t, err) var inhumePrm InhumePrm diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index 9e6758fb..3702f567 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -97,7 +97,7 @@ func TestLockUserScenario(t *testing.T) { id, _ := obj.ID() objAddr.SetObject(id) - err = Put(context.Background(), e, obj) + err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. @@ -105,7 +105,7 @@ func TestLockUserScenario(t *testing.T) { locker.WriteMembers([]oid.ID{id}) objectSDK.WriteLock(lockerObj, locker) - err = Put(context.Background(), e, lockerObj) + err = Put(context.Background(), e, lockerObj, false) require.NoError(t, err) err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id}) @@ -124,7 +124,7 @@ func TestLockUserScenario(t *testing.T) { tombObj.SetID(tombForLockID) tombObj.SetAttributes(a) - err = Put(context.Background(), e, tombObj) + err = Put(context.Background(), e, tombObj, false) require.NoError(t, err) inhumePrm.WithTarget(tombForLockAddr, lockerAddr) @@ -177,7 +177,7 @@ func TestLockExpiration(t *testing.T) { // 1. obj := testutil.GenerateObjectWithCID(cnr) - err = Put(context.Background(), e, obj) + err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. @@ -189,7 +189,7 @@ func TestLockExpiration(t *testing.T) { lock.SetType(objectSDK.TypeLock) lock.SetAttributes(a) - err = Put(context.Background(), e, lock) + err = Put(context.Background(), e, lock, false) require.NoError(t, err) id, _ := obj.ID() @@ -254,14 +254,14 @@ func TestLockForceRemoval(t *testing.T) { // 1. obj := testutil.GenerateObjectWithCID(cnr) - err = Put(context.Background(), e, obj) + err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. lock := testutil.GenerateObjectWithCID(cnr) lock.SetType(objectSDK.TypeLock) - err = Put(context.Background(), e, lock) + err = Put(context.Background(), e, lock, false) require.NoError(t, err) id, _ := obj.ID() diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 9ce31e79..41e56656 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -22,7 +22,8 @@ import ( // PutPrm groups the parameters of Put operation. type PutPrm struct { - Object *objectSDK.Object + Object *objectSDK.Object + IsIndexedContainer bool } var errPutShard = errors.New("could not put object to any shard") @@ -194,6 +195,6 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti } // Put writes provided object to local storage. -func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object) error { - return storage.Put(ctx, PutPrm{Object: obj}) +func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object, indexedContainer bool) error { + return storage.Put(ctx, PutPrm{Object: obj, IsIndexedContainer: indexedContainer}) } diff --git a/pkg/local_object_storage/engine/tree_test.go b/pkg/local_object_storage/engine/tree_test.go index 6f694f08..21fcce41 100644 --- a/pkg/local_object_storage/engine/tree_test.go +++ b/pkg/local_object_storage/engine/tree_test.go @@ -37,7 +37,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { for i := range objCount { obj := testutil.GenerateObjectWithCID(cid) testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i)) - err := Put(context.Background(), te.ng, obj) + err := Put(context.Background(), te.ng, obj, false) if err != nil { b.Fatal(err) } diff --git a/pkg/services/object/common/writer/ec.go b/pkg/services/object/common/writer/ec.go index fb0a8e4e..6b6a14cc 100644 --- a/pkg/services/object/common/writer/ec.go +++ b/pkg/services/object/common/writer/ec.go @@ -310,7 +310,8 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { var err error localTarget := LocalTarget{ - Storage: e.Config.LocalStore, + Storage: e.Config.LocalStore, + Container: e.Container, } completed := make(chan interface{}) if poolErr := e.Config.LocalPool.Submit(func() { diff --git a/pkg/services/object/common/writer/local.go b/pkg/services/object/common/writer/local.go index 02fd25b9..e219b44d 100644 --- a/pkg/services/object/common/writer/local.go +++ b/pkg/services/object/common/writer/local.go @@ -4,7 +4,9 @@ import ( "context" "fmt" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) @@ -13,7 +15,7 @@ import ( type ObjectStorage interface { // Put must save passed object // and return any appeared error. - Put(context.Context, *objectSDK.Object) error + Put(context.Context, *objectSDK.Object, bool) error // Delete must delete passed objects // and return any appeared error. Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error @@ -25,7 +27,8 @@ type ObjectStorage interface { } type LocalTarget struct { - Storage ObjectStorage + Storage ObjectStorage + Container containerSDK.Container } func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error { @@ -44,7 +47,7 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met // objects that do not change meta storage } - if err := t.Storage.Put(ctx, obj); err != nil { + if err := t.Storage.Put(ctx, obj, containerCore.IsIndexedContainer(t.Container)); err != nil { return fmt.Errorf("(%T) could not put object to local storage: %w", t, err) } return nil diff --git a/pkg/services/object/common/writer/writer.go b/pkg/services/object/common/writer/writer.go index 3d50da98..0e4c4d9c 100644 --- a/pkg/services/object/common/writer/writer.go +++ b/pkg/services/object/common/writer/writer.go @@ -150,7 +150,8 @@ func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.Object nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget { if node.Local { return LocalTarget{ - Storage: prm.Config.LocalStore, + Storage: prm.Config.LocalStore, + Container: prm.Container, } } diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 9b416326..5f9b5d11 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -177,7 +177,7 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac } return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error { - return s.saveToPlacementNode(ctx, &nd, obj, signer, meta) + return s.saveToPlacementNode(ctx, &nd, obj, signer, meta, placement.container) }) } @@ -263,10 +263,10 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb } func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object, - signer *putSingleRequestSigner, meta object.ContentMeta, + signer *putSingleRequestSigner, meta object.ContentMeta, container containerSDK.Container, ) error { if nodeDesc.Local { - return s.saveLocal(ctx, obj, meta) + return s.saveLocal(ctx, obj, meta, container) } var info client.NodeInfo @@ -281,9 +281,10 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwrite return s.redirectPutSingleRequest(ctx, signer, obj, info, c) } -func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error { +func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta, container containerSDK.Container) error { localTarget := &objectwriter.LocalTarget{ - Storage: s.Config.LocalStore, + Storage: s.Config.LocalStore, + Container: container, } return localTarget.WriteObject(ctx, obj, meta) } diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 06282bd8..401977f6 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -37,7 +37,7 @@ func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) er policy := cnr.Value.PlacementPolicy() if policycore.IsECPlacement(policy) { - return p.processECContainerObject(ctx, objInfo, policy) + return p.processECContainerObject(ctx, objInfo, cnr.Value) } return p.processRepContainerObject(ctx, objInfo, policy) } diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index e822d1c0..6d2c153c 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -10,6 +10,7 @@ import ( objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" @@ -27,11 +28,11 @@ type ecChunkProcessResult struct { var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector with at least one node") -func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { +func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, cnr containerSDK.Container) error { if objInfo.ECInfo == nil { - return p.processECContainerRepObject(ctx, objInfo, policy) + return p.processECContainerRepObject(ctx, objInfo, cnr.PlacementPolicy()) } - return p.processECContainerECObject(ctx, objInfo, policy) + return p.processECContainerECObject(ctx, objInfo, cnr) } // processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects. @@ -67,8 +68,8 @@ func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objec return nil } -func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { - nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy) +func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, cnr containerSDK.Container) error { + nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, cnr.PlacementPolicy()) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) } @@ -85,9 +86,9 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object res := p.processECChunk(ctx, objInfo, nn[0]) if !res.validPlacement { // drop local chunk only if all required chunks are in place - res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0]) + res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0], cnr) } - p.adjustECPlacement(ctx, objInfo, nn[0], policy) + p.adjustECPlacement(ctx, objInfo, nn[0], cnr) if res.removeLocal { p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) @@ -138,7 +139,7 @@ func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, n } } -func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool { +func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, cnr containerSDK.Container) bool { var parentAddress oid.Address parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetObject(objInfo.ECInfo.ParentID) @@ -169,8 +170,9 @@ func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.I addr.SetContainer(objInfo.Address.Container()) addr.SetObject(indexToObjectID[index]) p.replicator.HandlePullTask(ctx, replicator.Task{ - Addr: addr, - Nodes: candidates, + Addr: addr, + Nodes: candidates, + Container: cnr, }) } // there was some missing chunks, it's not ok @@ -245,7 +247,7 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A return true } -func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) { +func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, cnr containerSDK.Container) { var parentAddress oid.Address parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetObject(objInfo.ECInfo.ParentID) @@ -292,7 +294,7 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total { return } - if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() { + if objInfo.ECInfo.Total-uint32(len(resolved)) > cnr.PlacementPolicy().ReplicaDescriptor(0).GetECParityCount() { var found []uint32 for i := range resolved { found = append(found, i) @@ -300,11 +302,13 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found)) return } - p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy) + p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, cnr) } -func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) { - c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount())) +func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, + cnr containerSDK.Container, +) { + c, err := erasurecode.NewConstructor(int(cnr.PlacementPolicy().ReplicaDescriptor(0).GetECDataCount()), int(cnr.PlacementPolicy().ReplicaDescriptor(0).GetECParityCount())) if err != nil { p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err)) return @@ -339,8 +343,9 @@ func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, targetNode := nodes[idx%len(nodes)] if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) { p.replicator.HandleLocalPutTask(ctx, replicator.Task{ - Addr: addr, - Obj: part, + Addr: addr, + Obj: part, + Container: cnr, }) } else { p.replicator.HandleReplicationTask(ctx, replicator.Task{ diff --git a/pkg/services/replicator/pull.go b/pkg/services/replicator/pull.go index d178700f..7e709023 100644 --- a/pkg/services/replicator/pull.go +++ b/pkg/services/replicator/pull.go @@ -5,6 +5,7 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" @@ -62,7 +63,7 @@ func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { return } - err := engine.Put(ctx, p.localStorage, obj) + err := engine.Put(ctx, p.localStorage, obj, containerCore.IsIndexedContainer(task.Container)) if err != nil { p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, zap.Stringer("object", task.Addr), diff --git a/pkg/services/replicator/put.go b/pkg/services/replicator/put.go index c06ec3f6..53783351 100644 --- a/pkg/services/replicator/put.go +++ b/pkg/services/replicator/put.go @@ -5,6 +5,7 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -37,7 +38,7 @@ func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) { return } - err := engine.Put(ctx, p.localStorage, task.Obj) + err := engine.Put(ctx, p.localStorage, task.Obj, containerCore.IsIndexedContainer(task.Container)) if err != nil { p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, zap.Stringer("object", task.Addr), diff --git a/pkg/services/replicator/task.go b/pkg/services/replicator/task.go index d2b5b250..a03f8dca 100644 --- a/pkg/services/replicator/task.go +++ b/pkg/services/replicator/task.go @@ -1,6 +1,7 @@ package replicator import ( + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -16,4 +17,6 @@ type Task struct { Obj *objectSDK.Object // Nodes is a list of potential object holders. Nodes []netmap.NodeInfo + + Container containerSDK.Container }