From 287958d78f255ef2a2e11f42e5fcbfd30c79a992 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 1 Oct 2024 15:27:06 +0300 Subject: [PATCH] [#9999] engine: Add `IsS3Container` flag Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 4 +-- pkg/core/container/util.go | 12 +++++++++ .../engine/control_test.go | 2 +- .../engine/delete_test.go | 8 +++--- .../engine/engine_test.go | 2 +- .../engine/inhume_test.go | 2 +- pkg/local_object_storage/engine/lock_test.go | 14 +++++------ pkg/local_object_storage/engine/put.go | 7 +++--- pkg/local_object_storage/engine/tree_test.go | 2 +- pkg/services/object/common/writer/ec.go | 3 ++- pkg/services/object/common/writer/local.go | 9 ++++--- pkg/services/object/common/writer/writer.go | 3 ++- pkg/services/object/put/single.go | 11 ++++---- pkg/services/policer/check.go | 2 +- pkg/services/policer/ec.go | 25 +++++++++++-------- pkg/services/policer/option.go | 5 ++-- pkg/services/policer/policer_test.go | 5 ++-- pkg/services/replicator/pull.go | 6 +++-- pkg/services/replicator/put.go | 6 +++-- 19 files changed, 78 insertions(+), 50 deletions(-) diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 47649c88b..ba7473217 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -535,6 +535,6 @@ func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock) } -func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object) error { - return engine.Put(ctx, e.engine, o) +func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, isS3Container bool) error { + return engine.Put(ctx, e.engine, o, isS3Container) } diff --git a/pkg/core/container/util.go b/pkg/core/container/util.go index 98919284e..33820eb7e 100644 --- a/pkg/core/container/util.go +++ b/pkg/core/container/util.go @@ -4,6 +4,7 @@ import ( "errors" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) @@ -20,3 +21,14 @@ func WasRemoved(s Source, cid cid.ID) (bool, error) { } return false, err } + +// IsS3Container returns True if container is S3 bucket related container. +func IsS3Container(cnr containerSDK.Container) bool { + var isS3Container bool + cnr.IterateAttributes(func(key, _ string) { + if key == ".s3-location-constraint" { + isS3Container = true + } + }) + return isS3Container +} diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 2de92ae84..83babeca3 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -171,7 +171,7 @@ func TestExecBlocks(t *testing.T) { addr := object.AddressOf(obj) - require.NoError(t, Put(context.Background(), e, obj)) + require.NoError(t, Put(context.Background(), e, obj, false)) // block executions errBlock := errors.New("block exec err") diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 4a6758012..0904c9820 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -58,9 +58,9 @@ func TestDeleteBigObject(t *testing.T) { defer e.Close(context.Background()) for i := range children { - require.NoError(t, Put(context.Background(), e, children[i])) + require.NoError(t, Put(context.Background(), e, children[i], false)) } - require.NoError(t, Put(context.Background(), e, link)) + require.NoError(t, Put(context.Background(), e, link, false)) addrParent := object.AddressOf(parent) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) @@ -126,9 +126,9 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) { defer e.Close(context.Background()) for i := range children { - require.NoError(t, Put(context.Background(), e, children[i])) + require.NoError(t, Put(context.Background(), e, children[i], false)) } - require.NoError(t, Put(context.Background(), e, link)) + require.NoError(t, Put(context.Background(), e, link, false)) addrParent := object.AddressOf(parent) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 525e17f34..88c523b76 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -54,7 +54,7 @@ func benchmarkExists(b *testing.B, shardNum int) { addr := oidtest.Address() for range 100 { obj := testutil.GenerateObjectWithCID(cidtest.ID()) - err := Put(context.Background(), e, obj) + err := Put(context.Background(), e, obj, false) if err != nil { b.Fatal(err) } diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 9daa113f8..f87679253 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -40,7 +40,7 @@ func TestStorageEngine_Inhume(t *testing.T) { e := testNewEngine(t).setShardsNum(t, 1).engine defer e.Close(context.Background()) - err := Put(context.Background(), e, parent) + err := Put(context.Background(), e, parent, false) require.NoError(t, err) var inhumePrm InhumePrm diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index 9e6758fb4..3702f567f 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -97,7 +97,7 @@ func TestLockUserScenario(t *testing.T) { id, _ := obj.ID() objAddr.SetObject(id) - err = Put(context.Background(), e, obj) + err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. @@ -105,7 +105,7 @@ func TestLockUserScenario(t *testing.T) { locker.WriteMembers([]oid.ID{id}) objectSDK.WriteLock(lockerObj, locker) - err = Put(context.Background(), e, lockerObj) + err = Put(context.Background(), e, lockerObj, false) require.NoError(t, err) err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id}) @@ -124,7 +124,7 @@ func TestLockUserScenario(t *testing.T) { tombObj.SetID(tombForLockID) tombObj.SetAttributes(a) - err = Put(context.Background(), e, tombObj) + err = Put(context.Background(), e, tombObj, false) require.NoError(t, err) inhumePrm.WithTarget(tombForLockAddr, lockerAddr) @@ -177,7 +177,7 @@ func TestLockExpiration(t *testing.T) { // 1. obj := testutil.GenerateObjectWithCID(cnr) - err = Put(context.Background(), e, obj) + err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. @@ -189,7 +189,7 @@ func TestLockExpiration(t *testing.T) { lock.SetType(objectSDK.TypeLock) lock.SetAttributes(a) - err = Put(context.Background(), e, lock) + err = Put(context.Background(), e, lock, false) require.NoError(t, err) id, _ := obj.ID() @@ -254,14 +254,14 @@ func TestLockForceRemoval(t *testing.T) { // 1. obj := testutil.GenerateObjectWithCID(cnr) - err = Put(context.Background(), e, obj) + err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. lock := testutil.GenerateObjectWithCID(cnr) lock.SetType(objectSDK.TypeLock) - err = Put(context.Background(), e, lock) + err = Put(context.Background(), e, lock, false) require.NoError(t, err) id, _ := obj.ID() diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 50565b8e2..f16f8811e 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -22,7 +22,8 @@ import ( // PutPrm groups the parameters of Put operation. type PutPrm struct { - Object *objectSDK.Object + Object *objectSDK.Object + IsS3Container bool } var errPutShard = errors.New("could not put object to any shard") @@ -196,6 +197,6 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti } // Put writes provided object to local storage. -func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object) error { - return storage.Put(ctx, PutPrm{Object: obj}) +func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object, isS3Container bool) error { + return storage.Put(ctx, PutPrm{Object: obj, IsS3Container: isS3Container}) } diff --git a/pkg/local_object_storage/engine/tree_test.go b/pkg/local_object_storage/engine/tree_test.go index 6f694f082..21fcce415 100644 --- a/pkg/local_object_storage/engine/tree_test.go +++ b/pkg/local_object_storage/engine/tree_test.go @@ -37,7 +37,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { for i := range objCount { obj := testutil.GenerateObjectWithCID(cid) testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i)) - err := Put(context.Background(), te.ng, obj) + err := Put(context.Background(), te.ng, obj, false) if err != nil { b.Fatal(err) } diff --git a/pkg/services/object/common/writer/ec.go b/pkg/services/object/common/writer/ec.go index fb0a8e4e5..6b6a14cc0 100644 --- a/pkg/services/object/common/writer/ec.go +++ b/pkg/services/object/common/writer/ec.go @@ -310,7 +310,8 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { var err error localTarget := LocalTarget{ - Storage: e.Config.LocalStore, + Storage: e.Config.LocalStore, + Container: e.Container, } completed := make(chan interface{}) if poolErr := e.Config.LocalPool.Submit(func() { diff --git a/pkg/services/object/common/writer/local.go b/pkg/services/object/common/writer/local.go index 02fd25b9e..bc4a64f68 100644 --- a/pkg/services/object/common/writer/local.go +++ b/pkg/services/object/common/writer/local.go @@ -4,7 +4,9 @@ import ( "context" "fmt" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) @@ -13,7 +15,7 @@ import ( type ObjectStorage interface { // Put must save passed object // and return any appeared error. - Put(context.Context, *objectSDK.Object) error + Put(context.Context, *objectSDK.Object, bool) error // Delete must delete passed objects // and return any appeared error. Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error @@ -25,7 +27,8 @@ type ObjectStorage interface { } type LocalTarget struct { - Storage ObjectStorage + Storage ObjectStorage + Container containerSDK.Container } func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error { @@ -44,7 +47,7 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met // objects that do not change meta storage } - if err := t.Storage.Put(ctx, obj); err != nil { + if err := t.Storage.Put(ctx, obj, containerCore.IsS3Container(t.Container)); err != nil { return fmt.Errorf("(%T) could not put object to local storage: %w", t, err) } return nil diff --git a/pkg/services/object/common/writer/writer.go b/pkg/services/object/common/writer/writer.go index 3d50da988..0e4c4d9c6 100644 --- a/pkg/services/object/common/writer/writer.go +++ b/pkg/services/object/common/writer/writer.go @@ -150,7 +150,8 @@ func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.Object nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget { if node.Local { return LocalTarget{ - Storage: prm.Config.LocalStore, + Storage: prm.Config.LocalStore, + Container: prm.Container, } } diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 9b4163268..5f9b5d110 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -177,7 +177,7 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac } return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error { - return s.saveToPlacementNode(ctx, &nd, obj, signer, meta) + return s.saveToPlacementNode(ctx, &nd, obj, signer, meta, placement.container) }) } @@ -263,10 +263,10 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb } func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object, - signer *putSingleRequestSigner, meta object.ContentMeta, + signer *putSingleRequestSigner, meta object.ContentMeta, container containerSDK.Container, ) error { if nodeDesc.Local { - return s.saveLocal(ctx, obj, meta) + return s.saveLocal(ctx, obj, meta, container) } var info client.NodeInfo @@ -281,9 +281,10 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwrite return s.redirectPutSingleRequest(ctx, signer, obj, info, c) } -func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error { +func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta, container containerSDK.Container) error { localTarget := &objectwriter.LocalTarget{ - Storage: s.Config.LocalStore, + Storage: s.Config.LocalStore, + Container: container, } return localTarget.WriteObject(ctx, obj, meta) } diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 06282bd8d..89ba4d317 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -37,7 +37,7 @@ func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) er policy := cnr.Value.PlacementPolicy() if policycore.IsECPlacement(policy) { - return p.processECContainerObject(ctx, objInfo, policy) + return p.processECContainerObject(ctx, objInfo, policy, cnr.Value) } return p.processRepContainerObject(ctx, objInfo, policy) } diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index e822d1c09..17d19770e 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -10,6 +10,7 @@ import ( objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" @@ -27,11 +28,11 @@ type ecChunkProcessResult struct { var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector with at least one node") -func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { +func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy, cnr containerSDK.Container) error { if objInfo.ECInfo == nil { return p.processECContainerRepObject(ctx, objInfo, policy) } - return p.processECContainerECObject(ctx, objInfo, policy) + return p.processECContainerECObject(ctx, objInfo, policy, cnr) } // processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects. @@ -67,7 +68,7 @@ func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objec return nil } -func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { +func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy, cnr containerSDK.Container) error { nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) @@ -85,9 +86,9 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object res := p.processECChunk(ctx, objInfo, nn[0]) if !res.validPlacement { // drop local chunk only if all required chunks are in place - res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0]) + res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0], cnr) } - p.adjustECPlacement(ctx, objInfo, nn[0], policy) + p.adjustECPlacement(ctx, objInfo, nn[0], policy, cnr) if res.removeLocal { p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) @@ -138,7 +139,7 @@ func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, n } } -func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool { +func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, cnr containerSDK.Container) bool { var parentAddress oid.Address parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetObject(objInfo.ECInfo.ParentID) @@ -171,7 +172,7 @@ func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.I p.replicator.HandlePullTask(ctx, replicator.Task{ Addr: addr, Nodes: candidates, - }) + }, cnr) } // there was some missing chunks, it's not ok return false @@ -245,7 +246,7 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A return true } -func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) { +func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy, cnr containerSDK.Container) { var parentAddress oid.Address parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetObject(objInfo.ECInfo.ParentID) @@ -300,10 +301,12 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found)) return } - p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy) + p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy, cnr) } -func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) { +func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, + policy netmap.PlacementPolicy, cnr containerSDK.Container, +) { c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount())) if err != nil { p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err)) @@ -341,7 +344,7 @@ func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, p.replicator.HandleLocalPutTask(ctx, replicator.Task{ Addr: addr, Obj: part, - }) + }, cnr) } else { p.replicator.HandleReplicationTask(ctx, replicator.Task{ NumCopies: 1, diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 9dbfd8b9f..d1d100e99 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -37,8 +38,8 @@ type BuryFunc func(context.Context, oid.Address) error // Replicator is the interface to a consumer of replication tasks. type Replicator interface { HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) - HandlePullTask(ctx context.Context, task replicator.Task) - HandleLocalPutTask(ctx context.Context, task replicator.Task) + HandlePullTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container) + HandleLocalPutTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container) } // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 4e17e98a8..5374478f7 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -14,6 +14,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -452,10 +453,10 @@ func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replica r.handleReplicationTask(ctx, task, res) } -func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) { +func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container) { r.handleLocalPutTask(ctx, task) } -func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) { +func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container) { r.handlePullTask(ctx, task) } diff --git a/pkg/services/replicator/pull.go b/pkg/services/replicator/pull.go index d178700f6..8d3668081 100644 --- a/pkg/services/replicator/pull.go +++ b/pkg/services/replicator/pull.go @@ -5,10 +5,12 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -17,7 +19,7 @@ import ( var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node") -func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { +func (p *Replicator) HandlePullTask(ctx context.Context, task Task, cnr containerSDK.Container) { p.metrics.IncInFlightRequest() defer p.metrics.DecInFlightRequest() defer func() { @@ -62,7 +64,7 @@ func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { return } - err := engine.Put(ctx, p.localStorage, obj) + err := engine.Put(ctx, p.localStorage, obj, containerCore.IsS3Container(cnr)) if err != nil { p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, zap.Stringer("object", task.Addr), diff --git a/pkg/services/replicator/put.go b/pkg/services/replicator/put.go index c06ec3f65..deddc1311 100644 --- a/pkg/services/replicator/put.go +++ b/pkg/services/replicator/put.go @@ -5,9 +5,11 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -15,7 +17,7 @@ import ( var errObjectNotDefined = errors.New("object is not defined") -func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) { +func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task, cnr containerSDK.Container) { p.metrics.IncInFlightRequest() defer p.metrics.DecInFlightRequest() defer func() { @@ -37,7 +39,7 @@ func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) { return } - err := engine.Put(ctx, p.localStorage, task.Obj) + err := engine.Put(ctx, p.localStorage, task.Obj, containerCore.IsS3Container(cnr)) if err != nil { p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, zap.Stringer("object", task.Addr),