diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 4ad9ec6c6..49a846e77 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -1058,7 +1058,9 @@ func initLocalStorage(ctx context.Context, c *cfg) { var shardsAttached int for _, optsWithMeta := range c.shardOpts(ctx) { - id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) + id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, + shard.WithTombstoneSource(c.createTombstoneSource()), + shard.WithContainerInfoProvider(c.createContainerTypeResolver(ctx)))...) if err != nil { c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) } else { @@ -1313,7 +1315,10 @@ func (c *cfg) reloadConfig(ctx context.Context) { var rcfg engine.ReConfiguration for _, optsWithID := range c.shardOpts(ctx) { - rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))) + rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, + shard.WithTombstoneSource(c.createTombstoneSource()), + shard.WithContainerInfoProvider(c.createContainerTypeResolver(ctx)), + )) } err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) @@ -1414,6 +1419,20 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { return tombstoneSource } +func (c *cfg) createContainerTypeResolver(ctx context.Context) container.InfoProvider { + return container.NewInfoProvider(func() (container.Source, error) { + // threadsafe: called on init or on sighup when morph initialized + if c.cfgMorph.client == nil { + initMorphComponents(ctx, c) + } + cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary()) + if err != nil { + return nil, err + } + return containerClient.AsContainerSource(cc), nil + }) +} + func (c *cfg) shutdown() { old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN) if old == control.HealthStatus_SHUTTING_DOWN { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 7aef6873e..1ae4f0d3f 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -249,6 +249,7 @@ const ( ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode" ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode" ShardCouldNotUnmarshalObject = "could not unmarshal object" + ShardSkipObjectFromResyncContainerDeleted = "object skipped from metabase resync: container deleted" ShardCouldNotCloseShardComponent = "could not close shard component" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" diff --git a/pkg/core/container/info.go b/pkg/core/container/info.go new file mode 100644 index 000000000..ffe6423fa --- /dev/null +++ b/pkg/core/container/info.go @@ -0,0 +1,103 @@ +package container + +import ( + "sync" + + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" +) + +type Info struct { + IsS3 bool + Removed bool +} + +type infoValue struct { + info Info + err error +} + +type InfoProvider interface { + Info(id cid.ID) (Info, error) +} + +type infoProvider struct { + mtx *sync.RWMutex + cache map[cid.ID]infoValue + kl *utilSync.KeyLocker[cid.ID] + + source Source + sourceErr error + sourceOnce *sync.Once + sourceFactory func() (Source, error) +} + +func NewInfoProvider(sourceFactory func() (Source, error)) InfoProvider { + return &infoProvider{ + mtx: &sync.RWMutex{}, + cache: make(map[cid.ID]infoValue), + sourceOnce: &sync.Once{}, + kl: utilSync.NewKeyLocker[cid.ID](), + sourceFactory: sourceFactory, + } +} + +func (r *infoProvider) Info(id cid.ID) (Info, error) { + v, found := r.tryGetFromCache(id) + if found { + return v.info, v.err + } + + return r.getFromSource(id) +} + +func (r *infoProvider) tryGetFromCache(id cid.ID) (infoValue, bool) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + value, found := r.cache[id] + return value, found +} + +func (r *infoProvider) getFromSource(id cid.ID) (Info, error) { + r.kl.Lock(id) + defer r.kl.Unlock(id) + + if v, ok := r.tryGetFromCache(id); ok { + return v.info, v.err + } + + r.sourceOnce.Do(func() { + r.source, r.sourceErr = r.sourceFactory() + }) + if r.sourceErr != nil { + return Info{}, r.sourceErr + } + + cnr, err := r.source.Get(id) + var civ infoValue + if err != nil { + if client.IsErrContainerNotFound(err) { + removed, err := WasRemoved(r.source, id) + if err != nil { + civ.err = err + } else { + civ.info.Removed = removed + } + } else { + civ.err = err + } + } else { + civ.info.IsS3 = IsS3Container(cnr.Value) + } + r.putToCache(id, civ) + return civ.info, civ.err +} + +func (r *infoProvider) putToCache(id cid.ID, ct infoValue) { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.cache[id] = ct +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 188d6fca8..13eedeae2 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -400,6 +400,12 @@ var indexedAttributes = map[string]struct{}{ objectSDK.AttributeFilePath: {}, } +// IsAtrributeIndexable returns True if attribute is indexable by metabase. +func IsAtrributeIndexable(attr string) bool { + _, found := indexedAttributes[attr] + return found +} + func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { id, _ := obj.ID() cnr, _ := obj.ContainerID() diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index de881654a..a296973ea 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -275,6 +275,27 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, return nil } + var hasIndexableAttribute bool + for _, attr := range obj.Attributes() { + if meta.IsAtrributeIndexable(attr.Key()) { + hasIndexableAttribute = true + break + } + } + + var isS3Container bool + if hasIndexableAttribute { + info, err := s.containerInfo.Info(addr.Container()) + if err != nil { + return err + } + if info.Removed { + s.log.Debug(logs.ShardSkipObjectFromResyncContainerDeleted, zap.Stringer("address", addr)) + return nil + } + isS3Container = info.IsS3 + } + var err error switch obj.Type() { case objectSDK.TypeTombstone: @@ -290,6 +311,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, var mPrm meta.PutPrm mPrm.SetObject(obj) mPrm.SetStorageID(descriptor) + mPrm.SetIndexAttributes(hasIndexableAttribute && !isS3Container) _, err = s.metaBase.Put(ctx, mPrm) if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index a57b548be..cc6c2662b 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -7,6 +7,7 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" @@ -95,6 +96,8 @@ type cfg struct { metricsWriter MetricsWriter reportErrorFunc func(selfID string, message string, err error) + + containerInfo container.InfoProvider } func defaultCfg() *cfg { @@ -358,6 +361,13 @@ func WithZeroCountCallback(cb EmptyContainersCallback) Option { } } +// WithContainerInfoProvider returns option to set container info provider. +func WithContainerInfoProvider(containerInfo container.InfoProvider) Option { + return func(c *cfg) { + c.containerInfo = containerInfo + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()