[#1412] engine: Add IsIndexedContainer
flag
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
899cd55c27
commit
1b520f7973
19 changed files with 182 additions and 50 deletions
|
@ -535,6 +535,6 @@ func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address
|
||||||
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object) error {
|
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexableContainer bool) error {
|
||||||
return engine.Put(ctx, e.engine, o)
|
return engine.Put(ctx, e.engine, o, indexableContainer)
|
||||||
}
|
}
|
||||||
|
|
103
pkg/core/container/info.go
Normal file
103
pkg/core/container/info.go
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Info struct {
|
||||||
|
Indexed bool
|
||||||
|
Removed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type infoValue struct {
|
||||||
|
info Info
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type InfoProvider interface {
|
||||||
|
Info(id cid.ID) (Info, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type infoProvider struct {
|
||||||
|
mtx *sync.RWMutex
|
||||||
|
cache map[cid.ID]infoValue
|
||||||
|
kl *utilSync.KeyLocker[cid.ID]
|
||||||
|
|
||||||
|
source Source
|
||||||
|
sourceErr error
|
||||||
|
sourceOnce *sync.Once
|
||||||
|
sourceFactory func() (Source, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInfoProvider(sourceFactory func() (Source, error)) InfoProvider {
|
||||||
|
return &infoProvider{
|
||||||
|
mtx: &sync.RWMutex{},
|
||||||
|
cache: make(map[cid.ID]infoValue),
|
||||||
|
sourceOnce: &sync.Once{},
|
||||||
|
kl: utilSync.NewKeyLocker[cid.ID](),
|
||||||
|
sourceFactory: sourceFactory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *infoProvider) Info(id cid.ID) (Info, error) {
|
||||||
|
v, found := r.tryGetFromCache(id)
|
||||||
|
if found {
|
||||||
|
return v.info, v.err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.getFromSource(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *infoProvider) tryGetFromCache(id cid.ID) (infoValue, bool) {
|
||||||
|
r.mtx.RLock()
|
||||||
|
defer r.mtx.RUnlock()
|
||||||
|
|
||||||
|
value, found := r.cache[id]
|
||||||
|
return value, found
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *infoProvider) getFromSource(id cid.ID) (Info, error) {
|
||||||
|
r.kl.Lock(id)
|
||||||
|
defer r.kl.Unlock(id)
|
||||||
|
|
||||||
|
if v, ok := r.tryGetFromCache(id); ok {
|
||||||
|
return v.info, v.err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.sourceOnce.Do(func() {
|
||||||
|
r.source, r.sourceErr = r.sourceFactory()
|
||||||
|
})
|
||||||
|
if r.sourceErr != nil {
|
||||||
|
return Info{}, r.sourceErr
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr, err := r.source.Get(id)
|
||||||
|
var civ infoValue
|
||||||
|
if err != nil {
|
||||||
|
if client.IsErrContainerNotFound(err) {
|
||||||
|
removed, err := WasRemoved(r.source, id)
|
||||||
|
if err != nil {
|
||||||
|
civ.err = err
|
||||||
|
} else {
|
||||||
|
civ.info.Removed = removed
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
civ.err = err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
civ.info.Indexed = IsIndexedContainer(cnr.Value)
|
||||||
|
}
|
||||||
|
r.putToCache(id, civ)
|
||||||
|
return civ.info, civ.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *infoProvider) putToCache(id cid.ID, ct infoValue) {
|
||||||
|
r.mtx.Lock()
|
||||||
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
r.cache[id] = ct
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,3 +21,14 @@ func WasRemoved(s Source, cid cid.ID) (bool, error) {
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsIndexedContainer returns True if container attributes should be indexed.
|
||||||
|
func IsIndexedContainer(cnr containerSDK.Container) bool {
|
||||||
|
var isS3Container bool
|
||||||
|
cnr.IterateAttributes(func(key, _ string) {
|
||||||
|
if key == ".s3-location-constraint" {
|
||||||
|
isS3Container = true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return !isS3Container
|
||||||
|
}
|
||||||
|
|
|
@ -171,7 +171,7 @@ func TestExecBlocks(t *testing.T) {
|
||||||
|
|
||||||
addr := object.AddressOf(obj)
|
addr := object.AddressOf(obj)
|
||||||
|
|
||||||
require.NoError(t, Put(context.Background(), e, obj))
|
require.NoError(t, Put(context.Background(), e, obj, false))
|
||||||
|
|
||||||
// block executions
|
// block executions
|
||||||
errBlock := errors.New("block exec err")
|
errBlock := errors.New("block exec err")
|
||||||
|
|
|
@ -58,9 +58,9 @@ func TestDeleteBigObject(t *testing.T) {
|
||||||
defer e.Close(context.Background())
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
for i := range children {
|
for i := range children {
|
||||||
require.NoError(t, Put(context.Background(), e, children[i]))
|
require.NoError(t, Put(context.Background(), e, children[i], false))
|
||||||
}
|
}
|
||||||
require.NoError(t, Put(context.Background(), e, link))
|
require.NoError(t, Put(context.Background(), e, link, false))
|
||||||
|
|
||||||
addrParent := object.AddressOf(parent)
|
addrParent := object.AddressOf(parent)
|
||||||
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
|
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
|
||||||
|
@ -126,9 +126,9 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
defer e.Close(context.Background())
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
for i := range children {
|
for i := range children {
|
||||||
require.NoError(t, Put(context.Background(), e, children[i]))
|
require.NoError(t, Put(context.Background(), e, children[i], false))
|
||||||
}
|
}
|
||||||
require.NoError(t, Put(context.Background(), e, link))
|
require.NoError(t, Put(context.Background(), e, link, false))
|
||||||
|
|
||||||
addrParent := object.AddressOf(parent)
|
addrParent := object.AddressOf(parent)
|
||||||
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
|
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
|
||||||
|
|
|
@ -54,7 +54,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
addr := oidtest.Address()
|
addr := oidtest.Address()
|
||||||
for range 100 {
|
for range 100 {
|
||||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||||
err := Put(context.Background(), e, obj)
|
err := Put(context.Background(), e, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
e := testNewEngine(t).setShardsNum(t, 1).engine
|
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||||
defer e.Close(context.Background())
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
err := Put(context.Background(), e, parent)
|
err := Put(context.Background(), e, parent, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
|
|
|
@ -97,7 +97,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
objAddr.SetObject(id)
|
objAddr.SetObject(id)
|
||||||
|
|
||||||
err = Put(context.Background(), e, obj)
|
err = Put(context.Background(), e, obj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 2.
|
// 2.
|
||||||
|
@ -105,7 +105,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
locker.WriteMembers([]oid.ID{id})
|
locker.WriteMembers([]oid.ID{id})
|
||||||
objectSDK.WriteLock(lockerObj, locker)
|
objectSDK.WriteLock(lockerObj, locker)
|
||||||
|
|
||||||
err = Put(context.Background(), e, lockerObj)
|
err = Put(context.Background(), e, lockerObj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
|
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
|
||||||
|
@ -124,7 +124,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
tombObj.SetID(tombForLockID)
|
tombObj.SetID(tombForLockID)
|
||||||
tombObj.SetAttributes(a)
|
tombObj.SetAttributes(a)
|
||||||
|
|
||||||
err = Put(context.Background(), e, tombObj)
|
err = Put(context.Background(), e, tombObj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
||||||
|
@ -177,7 +177,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
// 1.
|
// 1.
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
err = Put(context.Background(), e, obj)
|
err = Put(context.Background(), e, obj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 2.
|
// 2.
|
||||||
|
@ -189,7 +189,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
lock.SetType(objectSDK.TypeLock)
|
lock.SetType(objectSDK.TypeLock)
|
||||||
lock.SetAttributes(a)
|
lock.SetAttributes(a)
|
||||||
|
|
||||||
err = Put(context.Background(), e, lock)
|
err = Put(context.Background(), e, lock, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
|
@ -254,14 +254,14 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
// 1.
|
// 1.
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
err = Put(context.Background(), e, obj)
|
err = Put(context.Background(), e, obj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 2.
|
// 2.
|
||||||
lock := testutil.GenerateObjectWithCID(cnr)
|
lock := testutil.GenerateObjectWithCID(cnr)
|
||||||
lock.SetType(objectSDK.TypeLock)
|
lock.SetType(objectSDK.TypeLock)
|
||||||
|
|
||||||
err = Put(context.Background(), e, lock)
|
err = Put(context.Background(), e, lock, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
|
|
|
@ -22,7 +22,8 @@ import (
|
||||||
|
|
||||||
// PutPrm groups the parameters of Put operation.
|
// PutPrm groups the parameters of Put operation.
|
||||||
type PutPrm struct {
|
type PutPrm struct {
|
||||||
Object *objectSDK.Object
|
Object *objectSDK.Object
|
||||||
|
IsIndexedContainer bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var errPutShard = errors.New("could not put object to any shard")
|
var errPutShard = errors.New("could not put object to any shard")
|
||||||
|
@ -194,6 +195,6 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put writes provided object to local storage.
|
// Put writes provided object to local storage.
|
||||||
func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object) error {
|
func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object, indexedContainer bool) error {
|
||||||
return storage.Put(ctx, PutPrm{Object: obj})
|
return storage.Put(ctx, PutPrm{Object: obj, IsIndexedContainer: indexedContainer})
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
for i := range objCount {
|
for i := range objCount {
|
||||||
obj := testutil.GenerateObjectWithCID(cid)
|
obj := testutil.GenerateObjectWithCID(cid)
|
||||||
testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
|
testutil.AddAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i))
|
||||||
err := Put(context.Background(), te.ng, obj)
|
err := Put(context.Background(), te.ng, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -310,7 +310,8 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
|
||||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
var err error
|
var err error
|
||||||
localTarget := LocalTarget{
|
localTarget := LocalTarget{
|
||||||
Storage: e.Config.LocalStore,
|
Storage: e.Config.LocalStore,
|
||||||
|
Container: e.Container,
|
||||||
}
|
}
|
||||||
completed := make(chan interface{})
|
completed := make(chan interface{})
|
||||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
if poolErr := e.Config.LocalPool.Submit(func() {
|
||||||
|
|
|
@ -4,7 +4,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -13,7 +15,7 @@ import (
|
||||||
type ObjectStorage interface {
|
type ObjectStorage interface {
|
||||||
// Put must save passed object
|
// Put must save passed object
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Put(context.Context, *objectSDK.Object) error
|
Put(context.Context, *objectSDK.Object, bool) error
|
||||||
// Delete must delete passed objects
|
// Delete must delete passed objects
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error
|
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error
|
||||||
|
@ -25,7 +27,8 @@ type ObjectStorage interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalTarget struct {
|
type LocalTarget struct {
|
||||||
Storage ObjectStorage
|
Storage ObjectStorage
|
||||||
|
Container containerSDK.Container
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
|
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
|
||||||
|
@ -44,7 +47,7 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
|
||||||
// objects that do not change meta storage
|
// objects that do not change meta storage
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.Storage.Put(ctx, obj); err != nil {
|
if err := t.Storage.Put(ctx, obj, containerCore.IsIndexedContainer(t.Container)); err != nil {
|
||||||
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -150,7 +150,8 @@ func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.Object
|
||||||
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
|
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
|
||||||
if node.Local {
|
if node.Local {
|
||||||
return LocalTarget{
|
return LocalTarget{
|
||||||
Storage: prm.Config.LocalStore,
|
Storage: prm.Config.LocalStore,
|
||||||
|
Container: prm.Container,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -177,7 +177,7 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
|
||||||
}
|
}
|
||||||
|
|
||||||
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
|
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
|
||||||
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
|
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta, placement.container)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,10 +263,10 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
|
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
|
||||||
signer *putSingleRequestSigner, meta object.ContentMeta,
|
signer *putSingleRequestSigner, meta object.ContentMeta, container containerSDK.Container,
|
||||||
) error {
|
) error {
|
||||||
if nodeDesc.Local {
|
if nodeDesc.Local {
|
||||||
return s.saveLocal(ctx, obj, meta)
|
return s.saveLocal(ctx, obj, meta, container)
|
||||||
}
|
}
|
||||||
|
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
@ -281,9 +281,10 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwrite
|
||||||
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
|
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta, container containerSDK.Container) error {
|
||||||
localTarget := &objectwriter.LocalTarget{
|
localTarget := &objectwriter.LocalTarget{
|
||||||
Storage: s.Config.LocalStore,
|
Storage: s.Config.LocalStore,
|
||||||
|
Container: container,
|
||||||
}
|
}
|
||||||
return localTarget.WriteObject(ctx, obj, meta)
|
return localTarget.WriteObject(ctx, obj, meta)
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) er
|
||||||
policy := cnr.Value.PlacementPolicy()
|
policy := cnr.Value.PlacementPolicy()
|
||||||
|
|
||||||
if policycore.IsECPlacement(policy) {
|
if policycore.IsECPlacement(policy) {
|
||||||
return p.processECContainerObject(ctx, objInfo, policy)
|
return p.processECContainerObject(ctx, objInfo, cnr.Value)
|
||||||
}
|
}
|
||||||
return p.processRepContainerObject(ctx, objInfo, policy)
|
return p.processRepContainerObject(ctx, objInfo, policy)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
@ -27,11 +28,11 @@ type ecChunkProcessResult struct {
|
||||||
|
|
||||||
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector with at least one node")
|
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector with at least one node")
|
||||||
|
|
||||||
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, cnr containerSDK.Container) error {
|
||||||
if objInfo.ECInfo == nil {
|
if objInfo.ECInfo == nil {
|
||||||
return p.processECContainerRepObject(ctx, objInfo, policy)
|
return p.processECContainerRepObject(ctx, objInfo, cnr.PlacementPolicy())
|
||||||
}
|
}
|
||||||
return p.processECContainerECObject(ctx, objInfo, policy)
|
return p.processECContainerECObject(ctx, objInfo, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
|
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
|
||||||
|
@ -67,8 +68,8 @@ func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objec
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, cnr containerSDK.Container) error {
|
||||||
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, cnr.PlacementPolicy())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
}
|
}
|
||||||
|
@ -85,9 +86,9 @@ func (p *Policer) processECContainerECObject(ctx context.Context, objInfo object
|
||||||
res := p.processECChunk(ctx, objInfo, nn[0])
|
res := p.processECChunk(ctx, objInfo, nn[0])
|
||||||
if !res.validPlacement {
|
if !res.validPlacement {
|
||||||
// drop local chunk only if all required chunks are in place
|
// drop local chunk only if all required chunks are in place
|
||||||
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
|
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0], cnr)
|
||||||
}
|
}
|
||||||
p.adjustECPlacement(ctx, objInfo, nn[0], policy)
|
p.adjustECPlacement(ctx, objInfo, nn[0], cnr)
|
||||||
|
|
||||||
if res.removeLocal {
|
if res.removeLocal {
|
||||||
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
|
||||||
|
@ -138,7 +139,7 @@ func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
|
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, cnr containerSDK.Container) bool {
|
||||||
var parentAddress oid.Address
|
var parentAddress oid.Address
|
||||||
parentAddress.SetContainer(objInfo.Address.Container())
|
parentAddress.SetContainer(objInfo.Address.Container())
|
||||||
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||||
|
@ -169,8 +170,9 @@ func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.I
|
||||||
addr.SetContainer(objInfo.Address.Container())
|
addr.SetContainer(objInfo.Address.Container())
|
||||||
addr.SetObject(indexToObjectID[index])
|
addr.SetObject(indexToObjectID[index])
|
||||||
p.replicator.HandlePullTask(ctx, replicator.Task{
|
p.replicator.HandlePullTask(ctx, replicator.Task{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
Nodes: candidates,
|
Nodes: candidates,
|
||||||
|
Container: cnr,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// there was some missing chunks, it's not ok
|
// there was some missing chunks, it's not ok
|
||||||
|
@ -245,7 +247,7 @@ func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.A
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
|
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, cnr containerSDK.Container) {
|
||||||
var parentAddress oid.Address
|
var parentAddress oid.Address
|
||||||
parentAddress.SetContainer(objInfo.Address.Container())
|
parentAddress.SetContainer(objInfo.Address.Container())
|
||||||
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||||
|
@ -292,7 +294,7 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
|
||||||
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
|
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
|
if objInfo.ECInfo.Total-uint32(len(resolved)) > cnr.PlacementPolicy().ReplicaDescriptor(0).GetECParityCount() {
|
||||||
var found []uint32
|
var found []uint32
|
||||||
for i := range resolved {
|
for i := range resolved {
|
||||||
found = append(found, i)
|
found = append(found, i)
|
||||||
|
@ -300,11 +302,13 @@ func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info
|
||||||
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
|
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
|
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
|
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID,
|
||||||
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
|
cnr containerSDK.Container,
|
||||||
|
) {
|
||||||
|
c, err := erasurecode.NewConstructor(int(cnr.PlacementPolicy().ReplicaDescriptor(0).GetECDataCount()), int(cnr.PlacementPolicy().ReplicaDescriptor(0).GetECParityCount()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
return
|
return
|
||||||
|
@ -339,8 +343,9 @@ func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info,
|
||||||
targetNode := nodes[idx%len(nodes)]
|
targetNode := nodes[idx%len(nodes)]
|
||||||
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
|
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
|
||||||
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
|
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
Obj: part,
|
Obj: part,
|
||||||
|
Container: cnr,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
|
@ -62,7 +63,7 @@ func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := engine.Put(ctx, p.localStorage, obj)
|
err := engine.Put(ctx, p.localStorage, obj, containerCore.IsIndexedContainer(task.Container))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
zap.Stringer("object", task.Addr),
|
zap.Stringer("object", task.Addr),
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -37,7 +38,7 @@ func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := engine.Put(ctx, p.localStorage, task.Obj)
|
err := engine.Put(ctx, p.localStorage, task.Obj, containerCore.IsIndexedContainer(task.Container))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
zap.Stringer("object", task.Addr),
|
zap.Stringer("object", task.Addr),
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package replicator
|
package replicator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -16,4 +17,6 @@ type Task struct {
|
||||||
Obj *objectSDK.Object
|
Obj *objectSDK.Object
|
||||||
// Nodes is a list of potential object holders.
|
// Nodes is a list of potential object holders.
|
||||||
Nodes []netmap.NodeInfo
|
Nodes []netmap.NodeInfo
|
||||||
|
|
||||||
|
Container containerSDK.Container
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue