From d734ddafad8e2d6f6fb219c753250398fc24d3a5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 2 Oct 2024 16:46:39 +0300 Subject: [PATCH] [#9999] shard: Resolve container type (S3 or not) on metabase resync Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 24 ++++++- internal/logs/logs.go | 1 + pkg/core/container/type_resolver.go | 84 +++++++++++++++++++++++ pkg/local_object_storage/metabase/put.go | 6 ++ pkg/local_object_storage/shard/control.go | 21 ++++++ pkg/local_object_storage/shard/shard.go | 13 ++++ 6 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 pkg/core/container/type_resolver.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 4ad9ec6c6..0e7a3e07e 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -47,6 +47,7 @@ import ( shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" + cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" @@ -1058,7 +1059,9 @@ func initLocalStorage(ctx context.Context, c *cfg) { var shardsAttached int for _, optsWithMeta := range c.shardOpts(ctx) { - id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) + id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, + shard.WithTombstoneSource(c.createTombstoneSource()), + shard.WithContainerTypeResolver(c.createContainerTypeResolver(ctx)))...) if err != nil { c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) } else { @@ -1313,7 +1316,10 @@ func (c *cfg) reloadConfig(ctx context.Context) { var rcfg engine.ReConfiguration for _, optsWithID := range c.shardOpts(ctx) { - rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))) + rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, + shard.WithTombstoneSource(c.createTombstoneSource()), + shard.WithContainerTypeResolver(c.createContainerTypeResolver(ctx)), + )) } err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) @@ -1414,6 +1420,20 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { return tombstoneSource } +func (c *cfg) createContainerTypeResolver(ctx context.Context) shard.ContainerTypeResolver { + return container.NewContainerTypeResolver(func() (container.Source, error) { + // threadsafe: called on init or on sighup when cnrSource initialized + if c.cfgMorph.client == nil { + initMorphComponents(ctx, c) + } + cc, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) + if err != nil { + return nil, err + } + return containerClient.AsContainerSource(cc), nil + }) +} + func (c *cfg) shutdown() { old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN) if old == control.HealthStatus_SHUTTING_DOWN { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 7aef6873e..1ae4f0d3f 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -249,6 +249,7 @@ const ( ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode" ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode" ShardCouldNotUnmarshalObject = "could not unmarshal object" + ShardSkipObjectFromResyncContainerDeleted = "object skipped from metabase resync: container deleted" ShardCouldNotCloseShardComponent = "could not close shard component" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" diff --git a/pkg/core/container/type_resolver.go b/pkg/core/container/type_resolver.go new file mode 100644 index 000000000..0b0677ddf --- /dev/null +++ b/pkg/core/container/type_resolver.go @@ -0,0 +1,84 @@ +package container + +import ( + "sync" + + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" +) + +type contType struct { + isS3 bool + err error +} + +type TypeResolver struct { + mtx *sync.RWMutex + cache map[cid.ID]contType + kl *utilSync.KeyLocker[cid.ID] + + source Source + sourceErr error + sourceOnce *sync.Once + sourceFactory func() (Source, error) +} + +func NewContainerTypeResolver(sourceFactory func() (Source, error)) *TypeResolver { + return &TypeResolver{ + mtx: &sync.RWMutex{}, + cache: make(map[cid.ID]contType), + sourceOnce: &sync.Once{}, + kl: utilSync.NewKeyLocker[cid.ID](), + sourceFactory: sourceFactory, + } +} + +func (r *TypeResolver) IsS3Container(id cid.ID) (bool, error) { + v, found := r.tryGetFromCache(id) + if found { + return v.isS3, v.err + } + + return r.getFromSource(id) +} + +func (r *TypeResolver) tryGetFromCache(id cid.ID) (contType, bool) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + value, found := r.cache[id] + return value, found +} + +func (r *TypeResolver) getFromSource(id cid.ID) (bool, error) { + r.kl.Lock(id) + defer r.kl.Unlock(id) + + if v, ok := r.tryGetFromCache(id); ok { + return v.isS3, v.err + } + + r.sourceOnce.Do(func() { + r.source, r.sourceErr = r.sourceFactory() + }) + if r.sourceErr != nil { + return false, r.sourceErr + } + + cnr, err := r.source.Get(id) + var ct contType + if err != nil { + ct.err = err + } else { + ct.isS3 = IsS3Container(cnr.Value) + } + r.putToCache(id, ct) + return ct.isS3, ct.err +} + +func (r *TypeResolver) putToCache(id cid.ID, ct contType) { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.cache[id] = ct +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 188d6fca8..13eedeae2 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -400,6 +400,12 @@ var indexedAttributes = map[string]struct{}{ objectSDK.AttributeFilePath: {}, } +// IsAtrributeIndexable returns True if attribute is indexable by metabase. +func IsAtrributeIndexable(attr string) bool { + _, found := indexedAttributes[attr] + return found +} + func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { id, _ := obj.ID() cnr, _ := obj.ContainerID() diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index de881654a..b9e814b15 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -275,7 +275,27 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, return nil } + var hasIndexableAttribute bool + for _, attr := range obj.Attributes() { + if meta.IsAtrributeIndexable(attr.Key()) { + hasIndexableAttribute = true + break + } + } + + var isS3Container bool var err error + if hasIndexableAttribute { + isS3Container, err = s.cnrTypeResolver.IsS3Container(addr.Container()) + if err != nil { + if client.IsErrContainerNotFound(err) { + s.log.Debug(logs.ShardSkipObjectFromResyncContainerDeleted, zap.Stringer("address", addr)) + return nil + } + return err + } + } + switch obj.Type() { case objectSDK.TypeTombstone: err = s.refillTombstoneObject(ctx, obj) @@ -290,6 +310,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, var mPrm meta.PutPrm mPrm.SetObject(obj) mPrm.SetStorageID(descriptor) + mPrm.SetIndexAttributes(!isS3Container) _, err = s.metaBase.Put(ctx, mPrm) if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 7496fc352..457320bfb 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -101,6 +101,10 @@ type MetricsWriter interface { SetEvacuationInProgress(value bool) } +type ContainerTypeResolver interface { + IsS3Container(cnrID cid.ID) (bool, error) +} + type cfg struct { m sync.RWMutex @@ -139,6 +143,8 @@ type cfg struct { metricsWriter MetricsWriter reportErrorFunc func(selfID string, message string, err error) + + cnrTypeResolver ContainerTypeResolver } func defaultCfg() *cfg { @@ -401,6 +407,13 @@ func WithZeroCountCallback(cb EmptyContainersCallback) Option { } } +// WithContainerTypeResolver returns option to set container type resolver. +func WithContainerTypeResolver(ctr ContainerTypeResolver) Option { + return func(c *cfg) { + c.cnrTypeResolver = ctr + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()