[#9999] engine: Add IsS3Container flag

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-01 15:27:06 +03:00
parent 327b29348f
commit 287958d78f
19 changed files with 78 additions and 50 deletions

View file

@ -535,6 +535,6 @@ func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock) return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
} }
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object) error { func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, isS3Container bool) error {
return engine.Put(ctx, e.engine, o) return engine.Put(ctx, e.engine, o, isS3Container)
} }

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
) )
@ -20,3 +21,14 @@ func WasRemoved(s Source, cid cid.ID) (bool, error) {
} }
return false, err return false, err
} }
// IsS3Container returns True if container is S3 bucket related container.
func IsS3Container(cnr containerSDK.Container) bool {
var isS3Container bool
cnr.IterateAttributes(func(key, _ string) {
if key == ".s3-location-constraint" {
isS3Container = true
}
})
return isS3Container
}

View file

@ -171,7 +171,7 @@ func TestExecBlocks(t *testing.T) {
addr := object.AddressOf(obj) addr := object.AddressOf(obj)
require.NoError(t, Put(context.Background(), e, obj)) require.NoError(t, Put(context.Background(), e, obj, false))
// block executions // block executions
errBlock := errors.New("block exec err") errBlock := errors.New("block exec err")

View file

@ -58,9 +58,9 @@ func TestDeleteBigObject(t *testing.T) {
defer e.Close(context.Background()) defer e.Close(context.Background())
for i := range children { for i := range children {
require.NoError(t, Put(context.Background(), e, children[i])) require.NoError(t, Put(context.Background(), e, children[i], false))
} }
require.NoError(t, Put(context.Background(), e, link)) require.NoError(t, Put(context.Background(), e, link, false))
addrParent := object.AddressOf(parent) addrParent := object.AddressOf(parent)
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
@ -126,9 +126,9 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
defer e.Close(context.Background()) defer e.Close(context.Background())
for i := range children { for i := range children {
require.NoError(t, Put(context.Background(), e, children[i])) require.NoError(t, Put(context.Background(), e, children[i], false))
} }
require.NoError(t, Put(context.Background(), e, link)) require.NoError(t, Put(context.Background(), e, link, false))
addrParent := object.AddressOf(parent) addrParent := object.AddressOf(parent)
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)

View file

@ -54,7 +54,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
addr := oidtest.Address() addr := oidtest.Address()
for range 100 { for range 100 {
obj := testutil.GenerateObjectWithCID(cidtest.ID()) obj := testutil.GenerateObjectWithCID(cidtest.ID())
err := Put(context.Background(), e, obj) err := Put(context.Background(), e, obj, false)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View file

@ -40,7 +40,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
e := testNewEngine(t).setShardsNum(t, 1).engine e := testNewEngine(t).setShardsNum(t, 1).engine
defer e.Close(context.Background()) defer e.Close(context.Background())
err := Put(context.Background(), e, parent) err := Put(context.Background(), e, parent, false)
require.NoError(t, err) require.NoError(t, err)
var inhumePrm InhumePrm var inhumePrm InhumePrm

View file

@ -97,7 +97,7 @@ func TestLockUserScenario(t *testing.T) {
id, _ := obj.ID() id, _ := obj.ID()
objAddr.SetObject(id) objAddr.SetObject(id)
err = Put(context.Background(), e, obj) err = Put(context.Background(), e, obj, false)
require.NoError(t, err) require.NoError(t, err)
// 2. // 2.
@ -105,7 +105,7 @@ func TestLockUserScenario(t *testing.T) {
locker.WriteMembers([]oid.ID{id}) locker.WriteMembers([]oid.ID{id})
objectSDK.WriteLock(lockerObj, locker) objectSDK.WriteLock(lockerObj, locker)
err = Put(context.Background(), e, lockerObj) err = Put(context.Background(), e, lockerObj, false)
require.NoError(t, err) require.NoError(t, err)
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id}) err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
@ -124,7 +124,7 @@ func TestLockUserScenario(t *testing.T) {
tombObj.SetID(tombForLockID) tombObj.SetID(tombForLockID)
tombObj.SetAttributes(a) tombObj.SetAttributes(a)
err = Put(context.Background(), e, tombObj) err = Put(context.Background(), e, tombObj, false)
require.NoError(t, err) require.NoError(t, err)
inhumePrm.WithTarget(tombForLockAddr, lockerAddr) inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
@ -177,7 +177,7 @@ func TestLockExpiration(t *testing.T) {
// 1. // 1.
obj := testutil.GenerateObjectWithCID(cnr) obj := testutil.GenerateObjectWithCID(cnr)
err = Put(context.Background(), e, obj) err = Put(context.Background(), e, obj, false)
require.NoError(t, err) require.NoError(t, err)
// 2. // 2.
@ -189,7 +189,7 @@ func TestLockExpiration(t *testing.T) {
lock.SetType(objectSDK.TypeLock) lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(a) lock.SetAttributes(a)
err = Put(context.Background(), e, lock) err = Put(context.Background(), e, lock, false)
require.NoError(t, err) require.NoError(t, err)
id, _ := obj.ID() id, _ := obj.ID()
@ -254,14 +254,14 @@ func TestLockForceRemoval(t *testing.T) {
// 1. // 1.
obj := testutil.GenerateObjectWithCID(cnr) obj := testutil.GenerateObjectWithCID(cnr)
err = Put(context.Background(), e, obj) err = Put(context.Background(), e, obj, false)
require.NoError(t, err) require.NoError(t, err)
// 2. // 2.
lock := testutil.GenerateObjectWithCID(cnr) lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock) lock.SetType(objectSDK.TypeLock)
err = Put(context.Background(), e, lock) err = Put(context.Background(), e, lock, false)
require.NoError(t, err) require.NoError(t, err)
id, _ := obj.ID() id, _ := obj.ID()

View file

@ -22,7 +22,8 @@ import (
// PutPrm groups the parameters of Put operation. // PutPrm groups the parameters of Put operation.
type PutPrm struct { type PutPrm struct {
Object *objectSDK.Object Object *objectSDK.Object
IsS3Container bool
} }
var errPutShard = errors.New("could not put object to any shard") var errPutShard = errors.New("could not put object to any shard")
@ -196,6 +197,6 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
} }
// Put writes provided object to local storage. // Put writes provided object to local storage.
func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object) error { func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object, isS3Container bool) error {
return storage.Put(ctx, PutPrm{Object: obj}) return storage.Put(ctx, PutPrm{Object: obj, IsS3Container: isS3Container})
} }

View file

@ -37,7 +37,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
for i := range objCount { for i := range objCount {
obj := testutil.GenerateObjectWithCID(cid) obj := testutil.GenerateObjectWithCID(cid)
testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i)) testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
err := Put(context.Background(), te.ng, obj) err := Put(context.Background(), te.ng, obj, false)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View file

@ -310,7 +310,8 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error var err error
localTarget := LocalTarget{ localTarget := LocalTarget{
Storage: e.Config.LocalStore, Storage: e.Config.LocalStore,
Container: e.Container,
} }
completed := make(chan interface{}) completed := make(chan interface{})
if poolErr := e.Config.LocalPool.Submit(func() { if poolErr := e.Config.LocalPool.Submit(func() {

View file

@ -4,7 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
@ -13,7 +15,7 @@ import (
type ObjectStorage interface { type ObjectStorage interface {
// Put must save passed object // Put must save passed object
// and return any appeared error. // and return any appeared error.
Put(context.Context, *objectSDK.Object) error Put(context.Context, *objectSDK.Object, bool) error
// Delete must delete passed objects // Delete must delete passed objects
// and return any appeared error. // and return any appeared error.
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error
@ -25,7 +27,8 @@ type ObjectStorage interface {
} }
type LocalTarget struct { type LocalTarget struct {
Storage ObjectStorage Storage ObjectStorage
Container containerSDK.Container
} }
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error { func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
@ -44,7 +47,7 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
// objects that do not change meta storage // objects that do not change meta storage
} }
if err := t.Storage.Put(ctx, obj); err != nil { if err := t.Storage.Put(ctx, obj, containerCore.IsS3Container(t.Container)); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err) return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
} }
return nil return nil

View file

@ -150,7 +150,8 @@ func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.Object
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget { nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
if node.Local { if node.Local {
return LocalTarget{ return LocalTarget{
Storage: prm.Config.LocalStore, Storage: prm.Config.LocalStore,
Container: prm.Container,
} }
} }

View file

@ -177,7 +177,7 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
} }
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error { return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta) return s.saveToPlacementNode(ctx, &nd, obj, signer, meta, placement.container)
}) })
} }
@ -263,10 +263,10 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
} }
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object, func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
signer *putSingleRequestSigner, meta object.ContentMeta, signer *putSingleRequestSigner, meta object.ContentMeta, container containerSDK.Container,
) error { ) error {
if nodeDesc.Local { if nodeDesc.Local {
return s.saveLocal(ctx, obj, meta) return s.saveLocal(ctx, obj, meta, container)
} }
var info client.NodeInfo var info client.NodeInfo
@ -281,9 +281,10 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwrite
return s.redirectPutSingleRequest(ctx, signer, obj, info, c) return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
} }
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error { func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta, container containerSDK.Container) error {
localTarget := &objectwriter.LocalTarget{ localTarget := &objectwriter.LocalTarget{
Storage: s.Config.LocalStore, Storage: s.Config.LocalStore,
Container: container,
} }
return localTarget.WriteObject(ctx, obj, meta) return localTarget.WriteObject(ctx, obj, meta)
} }

View file

@ -37,7 +37,7 @@ func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) er
policy := cnr.Value.PlacementPolicy() policy := cnr.Value.PlacementPolicy()
if policycore.IsECPlacement(policy) { if policycore.IsECPlacement(policy) {
return p.processECContainerObject(ctx, objInfo, policy) return p.processECContainerObject(ctx, objInfo, policy, cnr.Value)
} }
return p.processRepContainerObject(ctx, objInfo, policy) return p.processRepContainerObject(ctx, objInfo, policy)
} }

View file

@ -10,6 +10,7 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
@ -27,11 +28,11 @@ type ecChunkProcessResult struct {
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector with at least one node") var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector with at least one node")
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy, cnr containerSDK.Container) error {
if objInfo.ECInfo == nil { if objInfo.ECInfo == nil {
return p.processECContainerRepObject(ctx, objInfo, policy) return p.processECContainerRepObject(ctx, objInfo, policy)
} }
return p.processECContainerECObject(ctx, objInfo, policy) return p.processECContainerECObject(ctx, objInfo, policy, cnr)
} }
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects. // processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
@ -67,7 +68,7 @@ func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objec
return nil return nil
} }
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy, cnr containerSDK.Container) error {
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy) nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
@ -85,9 +86,9 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object
res := p.processECChunk(ctx, objInfo, nn[0]) res := p.processECChunk(ctx, objInfo, nn[0])
if !res.validPlacement { if !res.validPlacement {
// drop local chunk only if all required chunks are in place // drop local chunk only if all required chunks are in place
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0]) res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0], cnr)
} }
p.adjustECPlacement(ctx, objInfo, nn[0], policy) p.adjustECPlacement(ctx, objInfo, nn[0], policy, cnr)
if res.removeLocal { if res.removeLocal {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
@ -138,7 +139,7 @@ func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, n
} }
} }
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool { func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, cnr containerSDK.Container) bool {
var parentAddress oid.Address var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID) parentAddress.SetObject(objInfo.ECInfo.ParentID)
@ -171,7 +172,7 @@ func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.I
p.replicator.HandlePullTask(ctx, replicator.Task{ p.replicator.HandlePullTask(ctx, replicator.Task{
Addr: addr, Addr: addr,
Nodes: candidates, Nodes: candidates,
}) }, cnr)
} }
// there was some missing chunks, it's not ok // there was some missing chunks, it's not ok
return false return false
@ -245,7 +246,7 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A
return true return true
} }
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) { func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy, cnr containerSDK.Container) {
var parentAddress oid.Address var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container()) parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID) parentAddress.SetObject(objInfo.ECInfo.ParentID)
@ -300,10 +301,12 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found)) p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
return return
} }
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy) p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy, cnr)
} }
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) { func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID,
policy netmap.PlacementPolicy, cnr containerSDK.Container,
) {
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount())) c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
if err != nil { if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err)) p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
@ -341,7 +344,7 @@ func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info,
p.replicator.HandleLocalPutTask(ctx, replicator.Task{ p.replicator.HandleLocalPutTask(ctx, replicator.Task{
Addr: addr, Addr: addr,
Obj: part, Obj: part,
}) }, cnr)
} else { } else {
p.replicator.HandleReplicationTask(ctx, replicator.Task{ p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1, NumCopies: 1,

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -37,8 +38,8 @@ type BuryFunc func(context.Context, oid.Address) error
// Replicator is the interface to a consumer of replication tasks. // Replicator is the interface to a consumer of replication tasks.
type Replicator interface { type Replicator interface {
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
HandlePullTask(ctx context.Context, task replicator.Task) HandlePullTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container)
HandleLocalPutTask(ctx context.Context, task replicator.Task) HandleLocalPutTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container)
} }
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -452,10 +453,10 @@ func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replica
r.handleReplicationTask(ctx, task, res) r.handleReplicationTask(ctx, task, res)
} }
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) { func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container) {
r.handleLocalPutTask(ctx, task) r.handleLocalPutTask(ctx, task)
} }
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) { func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task, cnr containerSDK.Container) {
r.handlePullTask(ctx, task) r.handlePullTask(ctx, task)
} }

View file

@ -5,10 +5,12 @@ import (
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -17,7 +19,7 @@ import (
var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node") var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node")
func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { func (p *Replicator) HandlePullTask(ctx context.Context, task Task, cnr containerSDK.Container) {
p.metrics.IncInFlightRequest() p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest() defer p.metrics.DecInFlightRequest()
defer func() { defer func() {
@ -62,7 +64,7 @@ func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
return return
} }
err := engine.Put(ctx, p.localStorage, obj) err := engine.Put(ctx, p.localStorage, obj, containerCore.IsS3Container(cnr))
if err != nil { if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr), zap.Stringer("object", task.Addr),

View file

@ -5,9 +5,11 @@ import (
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
@ -15,7 +17,7 @@ import (
var errObjectNotDefined = errors.New("object is not defined") var errObjectNotDefined = errors.New("object is not defined")
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) { func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task, cnr containerSDK.Container) {
p.metrics.IncInFlightRequest() p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest() defer p.metrics.DecInFlightRequest()
defer func() { defer func() {
@ -37,7 +39,7 @@ func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
return return
} }
err := engine.Put(ctx, p.localStorage, task.Obj) err := engine.Put(ctx, p.localStorage, task.Obj, containerCore.IsS3Container(cnr))
if err != nil { if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr), zap.Stringer("object", task.Addr),