[#9999] shard: Resolve container type (S3 or not) on metabase resync
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
72b92f50ae
commit
d734ddafad
6 changed files with 147 additions and 2 deletions
|
@ -47,6 +47,7 @@ import (
|
||||||
shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
|
@ -1058,7 +1059,9 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
||||||
|
|
||||||
var shardsAttached int
|
var shardsAttached int
|
||||||
for _, optsWithMeta := range c.shardOpts(ctx) {
|
for _, optsWithMeta := range c.shardOpts(ctx) {
|
||||||
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts,
|
||||||
|
shard.WithTombstoneSource(c.createTombstoneSource()),
|
||||||
|
shard.WithContainerTypeResolver(c.createContainerTypeResolver(ctx)))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
@ -1313,7 +1316,10 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
|
|
||||||
var rcfg engine.ReConfiguration
|
var rcfg engine.ReConfiguration
|
||||||
for _, optsWithID := range c.shardOpts(ctx) {
|
for _, optsWithID := range c.shardOpts(ctx) {
|
||||||
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
|
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts,
|
||||||
|
shard.WithTombstoneSource(c.createTombstoneSource()),
|
||||||
|
shard.WithContainerTypeResolver(c.createContainerTypeResolver(ctx)),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
||||||
|
@ -1414,6 +1420,20 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||||
return tombstoneSource
|
return tombstoneSource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) createContainerTypeResolver(ctx context.Context) shard.ContainerTypeResolver {
|
||||||
|
return container.NewContainerTypeResolver(func() (container.Source, error) {
|
||||||
|
// threadsafe: called on init or on sighup when cnrSource initialized
|
||||||
|
if c.cfgMorph.client == nil {
|
||||||
|
initMorphComponents(ctx, c)
|
||||||
|
}
|
||||||
|
cc, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return containerClient.AsContainerSource(cc), nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cfg) shutdown() {
|
func (c *cfg) shutdown() {
|
||||||
old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
||||||
if old == control.HealthStatus_SHUTTING_DOWN {
|
if old == control.HealthStatus_SHUTTING_DOWN {
|
||||||
|
|
|
@ -249,6 +249,7 @@ const (
|
||||||
ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode"
|
ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode"
|
||||||
ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode"
|
ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode"
|
||||||
ShardCouldNotUnmarshalObject = "could not unmarshal object"
|
ShardCouldNotUnmarshalObject = "could not unmarshal object"
|
||||||
|
ShardSkipObjectFromResyncContainerDeleted = "object skipped from metabase resync: container deleted"
|
||||||
ShardCouldNotCloseShardComponent = "could not close shard component"
|
ShardCouldNotCloseShardComponent = "could not close shard component"
|
||||||
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
||||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||||
|
|
84
pkg/core/container/type_resolver.go
Normal file
84
pkg/core/container/type_resolver.go
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type contType struct {
|
||||||
|
isS3 bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type TypeResolver struct {
|
||||||
|
mtx *sync.RWMutex
|
||||||
|
cache map[cid.ID]contType
|
||||||
|
kl *utilSync.KeyLocker[cid.ID]
|
||||||
|
|
||||||
|
source Source
|
||||||
|
sourceErr error
|
||||||
|
sourceOnce *sync.Once
|
||||||
|
sourceFactory func() (Source, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewContainerTypeResolver(sourceFactory func() (Source, error)) *TypeResolver {
|
||||||
|
return &TypeResolver{
|
||||||
|
mtx: &sync.RWMutex{},
|
||||||
|
cache: make(map[cid.ID]contType),
|
||||||
|
sourceOnce: &sync.Once{},
|
||||||
|
kl: utilSync.NewKeyLocker[cid.ID](),
|
||||||
|
sourceFactory: sourceFactory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TypeResolver) IsS3Container(id cid.ID) (bool, error) {
|
||||||
|
v, found := r.tryGetFromCache(id)
|
||||||
|
if found {
|
||||||
|
return v.isS3, v.err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.getFromSource(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TypeResolver) tryGetFromCache(id cid.ID) (contType, bool) {
|
||||||
|
r.mtx.RLock()
|
||||||
|
defer r.mtx.RUnlock()
|
||||||
|
|
||||||
|
value, found := r.cache[id]
|
||||||
|
return value, found
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TypeResolver) getFromSource(id cid.ID) (bool, error) {
|
||||||
|
r.kl.Lock(id)
|
||||||
|
defer r.kl.Unlock(id)
|
||||||
|
|
||||||
|
if v, ok := r.tryGetFromCache(id); ok {
|
||||||
|
return v.isS3, v.err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.sourceOnce.Do(func() {
|
||||||
|
r.source, r.sourceErr = r.sourceFactory()
|
||||||
|
})
|
||||||
|
if r.sourceErr != nil {
|
||||||
|
return false, r.sourceErr
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr, err := r.source.Get(id)
|
||||||
|
var ct contType
|
||||||
|
if err != nil {
|
||||||
|
ct.err = err
|
||||||
|
} else {
|
||||||
|
ct.isS3 = IsS3Container(cnr.Value)
|
||||||
|
}
|
||||||
|
r.putToCache(id, ct)
|
||||||
|
return ct.isS3, ct.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TypeResolver) putToCache(id cid.ID, ct contType) {
|
||||||
|
r.mtx.Lock()
|
||||||
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
r.cache[id] = ct
|
||||||
|
}
|
|
@ -400,6 +400,12 @@ var indexedAttributes = map[string]struct{}{
|
||||||
objectSDK.AttributeFilePath: {},
|
objectSDK.AttributeFilePath: {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsAtrributeIndexable returns True if attribute is indexable by metabase.
|
||||||
|
func IsAtrributeIndexable(attr string) bool {
|
||||||
|
_, found := indexedAttributes[attr]
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
|
||||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
|
|
|
@ -275,7 +275,27 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var hasIndexableAttribute bool
|
||||||
|
for _, attr := range obj.Attributes() {
|
||||||
|
if meta.IsAtrributeIndexable(attr.Key()) {
|
||||||
|
hasIndexableAttribute = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var isS3Container bool
|
||||||
var err error
|
var err error
|
||||||
|
if hasIndexableAttribute {
|
||||||
|
isS3Container, err = s.cnrTypeResolver.IsS3Container(addr.Container())
|
||||||
|
if err != nil {
|
||||||
|
if client.IsErrContainerNotFound(err) {
|
||||||
|
s.log.Debug(logs.ShardSkipObjectFromResyncContainerDeleted, zap.Stringer("address", addr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
err = s.refillTombstoneObject(ctx, obj)
|
err = s.refillTombstoneObject(ctx, obj)
|
||||||
|
@ -290,6 +310,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
|
||||||
var mPrm meta.PutPrm
|
var mPrm meta.PutPrm
|
||||||
mPrm.SetObject(obj)
|
mPrm.SetObject(obj)
|
||||||
mPrm.SetStorageID(descriptor)
|
mPrm.SetStorageID(descriptor)
|
||||||
|
mPrm.SetIndexAttributes(!isS3Container)
|
||||||
|
|
||||||
_, err = s.metaBase.Put(ctx, mPrm)
|
_, err = s.metaBase.Put(ctx, mPrm)
|
||||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||||
|
|
|
@ -101,6 +101,10 @@ type MetricsWriter interface {
|
||||||
SetEvacuationInProgress(value bool)
|
SetEvacuationInProgress(value bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ContainerTypeResolver interface {
|
||||||
|
IsS3Container(cnrID cid.ID) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
m sync.RWMutex
|
m sync.RWMutex
|
||||||
|
|
||||||
|
@ -139,6 +143,8 @@ type cfg struct {
|
||||||
metricsWriter MetricsWriter
|
metricsWriter MetricsWriter
|
||||||
|
|
||||||
reportErrorFunc func(selfID string, message string, err error)
|
reportErrorFunc func(selfID string, message string, err error)
|
||||||
|
|
||||||
|
cnrTypeResolver ContainerTypeResolver
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -401,6 +407,13 @@ func WithZeroCountCallback(cb EmptyContainersCallback) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithContainerTypeResolver returns option to set container type resolver.
|
||||||
|
func WithContainerTypeResolver(ctr ContainerTypeResolver) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.cnrTypeResolver = ctr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) fillInfo() {
|
func (s *Shard) fillInfo() {
|
||||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||||
|
|
Loading…
Reference in a new issue