[#1412] shard: Resolve container is indexed on metabase resync

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-02 16:46:39 +03:00
parent 4572fa4874
commit 3da168f8cf
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
4 changed files with 49 additions and 2 deletions

View file

@ -1058,7 +1058,9 @@ func initLocalStorage(ctx context.Context, c *cfg) {
var shardsAttached int var shardsAttached int
for _, optsWithMeta := range c.shardOpts(ctx) { for _, optsWithMeta := range c.shardOpts(ctx) {
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts,
shard.WithTombstoneSource(c.createTombstoneSource()),
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)))...)
if err != nil { if err != nil {
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
} else { } else {
@ -1313,7 +1315,10 @@ func (c *cfg) reloadConfig(ctx context.Context) {
var rcfg engine.ReConfiguration var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts(ctx) { for _, optsWithID := range c.shardOpts(ctx) {
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))) rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts,
shard.WithTombstoneSource(c.createTombstoneSource()),
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)),
))
} }
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
@ -1414,6 +1419,20 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
return tombstoneSource return tombstoneSource
} }
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
return container.NewInfoProvider(func() (container.Source, error) {
// threadsafe: called on init or on sighup when morph initialized
if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
if err != nil {
return nil, err
}
return containerClient.AsContainerSource(cc), nil
})
}
func (c *cfg) shutdown() { func (c *cfg) shutdown() {
old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN) old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN)
if old == control.HealthStatus_SHUTTING_DOWN { if old == control.HealthStatus_SHUTTING_DOWN {

View file

@ -249,6 +249,7 @@ const (
ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode" ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode"
ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode" ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode"
ShardCouldNotUnmarshalObject = "could not unmarshal object" ShardCouldNotUnmarshalObject = "could not unmarshal object"
ShardSkipObjectFromResyncContainerDeleted = "object skipped from metabase resync: container deleted"
ShardCouldNotCloseShardComponent = "could not close shard component" ShardCouldNotCloseShardComponent = "could not close shard component"
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"slices"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -275,6 +276,21 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
return nil return nil
} }
hasIndexedAttribute := slices.IndexFunc(obj.Attributes(), func(attr objectSDK.Attribute) bool { return meta.IsAtrributeIndexed(attr.Key()) }) > 0
var isIndexedContainer bool
if hasIndexedAttribute {
info, err := s.containerInfo.Info(addr.Container())
if err != nil {
return err
}
if info.Removed {
s.log.Debug(logs.ShardSkipObjectFromResyncContainerDeleted, zap.Stringer("address", addr))
return nil
}
isIndexedContainer = info.Indexed
}
var err error var err error
switch obj.Type() { switch obj.Type() {
case objectSDK.TypeTombstone: case objectSDK.TypeTombstone:
@ -290,6 +306,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
var mPrm meta.PutPrm var mPrm meta.PutPrm
mPrm.SetObject(obj) mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor) mPrm.SetStorageID(descriptor)
mPrm.SetIndexAttributes(hasIndexedAttribute && isIndexedContainer)
_, err = s.metaBase.Put(ctx, mPrm) _, err = s.metaBase.Put(ctx, mPrm)
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
@ -95,6 +96,8 @@ type cfg struct {
metricsWriter MetricsWriter metricsWriter MetricsWriter
reportErrorFunc func(selfID string, message string, err error) reportErrorFunc func(selfID string, message string, err error)
containerInfo container.InfoProvider
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -358,6 +361,13 @@ func WithZeroCountCallback(cb EmptyContainersCallback) Option {
} }
} }
// WithContainerInfoProvider returns option to set container info provider.
func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
return func(c *cfg) {
c.containerInfo = containerInfo
}
}
func (s *Shard) fillInfo() { func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()