frostfs-node/pkg/local_object_storage/engine/shards.go
Dmitrii Stepanov 3a441f072f
[#1709] shard: Check if context canceled for shard iteration
If context has already been canceled, then there is no need to check other shards.
At the same time, it is necessary to avoid handling context cancellation
in each handler. Therefore, the context check has been moved to the shard
iteration method, which now returns an error.

Change-Id: I70030ace36593ce7d2b8376bee39fe82e9dbf88f
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-21 15:20:50 +03:00

482 lines
12 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/google/uuid"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errShardNotFound = logicerr.New("shard not found")
type hashedShard struct {
shardWrapper
hash uint64
}
type metricsWithID struct {
id string
mw MetricRegister
}
func (m *metricsWithID) SetShardID(id string) {
// concurrent settings are not expected =>
// no mutex protection
m.id = id
}
func (m *metricsWithID) SetObjectCounter(objectType string, v uint64) {
m.mw.SetObjectCounter(m.id, objectType, v)
}
func (m *metricsWithID) AddToObjectCounter(objectType string, delta int) {
m.mw.AddToObjectCounter(m.id, objectType, delta)
}
func (m *metricsWithID) IncObjectCounter(objectType string) {
m.mw.AddToObjectCounter(m.id, objectType, +1)
}
func (m *metricsWithID) SetMode(mode mode.Mode) {
m.mw.SetMode(m.id, mode)
}
func (m *metricsWithID) AddToContainerSize(cnr string, size int64) {
m.mw.AddToContainerSize(cnr, size)
}
func (m *metricsWithID) AddToPayloadSize(size int64) {
m.mw.AddToPayloadCounter(m.id, size)
}
func (m *metricsWithID) IncErrorCounter() {
m.mw.IncErrorCounter(m.id)
}
func (m *metricsWithID) ClearErrorCounter() {
m.mw.ClearErrorCounter(m.id)
}
func (m *metricsWithID) DeleteShardMetrics() {
m.mw.DeleteShardMetrics(m.id)
}
func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value)
}
func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) {
m.mw.IncContainerObjectCounter(m.id, cnrID, objectType)
}
func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
}
func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) {
m.mw.IncRefillObjectsCount(m.id, path, size, success)
}
func (m *metricsWithID) SetRefillPercent(path string, percent uint32) {
m.mw.SetRefillPercent(m.id, path, percent)
}
func (m *metricsWithID) SetRefillStatus(path string, status string) {
m.mw.SetRefillStatus(m.id, path, status)
}
func (m *metricsWithID) SetEvacuationInProgress(value bool) {
m.mw.SetEvacuationInProgress(m.id, value)
}
// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.
// Otherwise returns the ID of the added shard.
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
sh, err := e.createShard(ctx, opts)
if err != nil {
return nil, fmt.Errorf("create a shard: %w", err)
}
err = e.addShard(sh)
if err != nil {
return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
}
e.metrics.SetMode(sh.ID().String(), sh.GetMode())
return sh.ID(), nil
}
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
id, err := generateShardID()
if err != nil {
return nil, fmt.Errorf("generate shard ID: %w", err)
}
opts = e.appendMetrics(id, opts)
sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorByID),
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
shard.WithZeroCountCallback(e.processZeroCountContainers),
)...)
if err := sh.UpdateID(ctx); err != nil {
e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
}
return sh, nil
}
func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard.Option {
e.mtx.RLock()
defer e.mtx.RUnlock()
opts = append(opts,
shard.WithMetricsWriter(
&metricsWithID{
id: id.String(),
mw: e.metrics,
},
),
shard.WithWriteCacheMetrics(
&writeCacheMetrics{
shardID: id.String(),
metrics: e.metrics.WriteCache(),
},
),
shard.WithGCMetrics(
&gcMetrics{
storage: e.metrics.GC(),
shardID: id.String(),
},
),
)
return opts
}
func (e *StorageEngine) addShard(sh *shard.Shard) error {
e.mtx.Lock()
defer e.mtx.Unlock()
strID := sh.ID().String()
if _, ok := e.shards[strID]; ok {
return fmt.Errorf("shard with id %s was already added", strID)
}
e.shards[strID] = hashedShard{
shardWrapper: shardWrapper{
errorCount: new(atomic.Uint32),
Shard: sh,
},
hash: hrw.StringHash(strID),
}
return nil
}
// removeShards removes specified shards. Skips non-existent shards.
// Logs errors about shards that it could not Close after the removal.
func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
if len(ids) == 0 {
return
}
ss := make([]hashedShard, 0, len(ids))
e.mtx.Lock()
for _, id := range ids {
sh, found := e.shards[id]
if !found {
continue
}
e.metrics.DeleteShardMetrics(id)
ss = append(ss, sh)
delete(e.shards, id)
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
zap.String("id", id))
}
e.mtx.Unlock()
for _, sh := range ss {
err := sh.SetMode(ctx, mode.Disabled)
if err != nil {
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
}
err = sh.Close(ctx)
if err != nil {
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
}
}
}
func generateShardID() (*shard.ID, error) {
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}
bin, err := uid.MarshalBinary()
if err != nil {
return nil, err
}
return shard.NewIDFromBytes(bin), nil
}
func (e *StorageEngine) sortShards(objAddr interface{ EncodeToString() string }) []hashedShard {
e.mtx.RLock()
defer e.mtx.RUnlock()
h := hrw.StringHash(objAddr.EncodeToString())
shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards {
shards = append(shards, sh)
}
hrw.SortHasherSliceByValue(shards, h)
return shards
}
func (e *StorageEngine) unsortedShards() []hashedShard {
e.mtx.RLock()
defer e.mtx.RUnlock()
shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards {
shards = append(shards, sh)
}
return shards
}
func (e *StorageEngine) iterateOverSortedShards(ctx context.Context, addr oid.Address, handler func(int, hashedShard) (stop bool)) error {
for i, sh := range e.sortShards(addr) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if handler(i, sh) {
break
}
}
return nil
}
func (e *StorageEngine) iterateOverUnsortedShards(ctx context.Context, handler func(hashedShard) (stop bool)) error {
for _, sh := range e.unsortedShards() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if handler(sh) {
break
}
}
return nil
}
// SetShardMode sets mode of the shard with provided identifier.
//
// Returns an error if shard mode was not set, or shard was not found in storage engine.
func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
e.mtx.RLock()
defer e.mtx.RUnlock()
for shID, sh := range e.shards {
if id.String() == shID {
if resetErrorCounter {
sh.errorCount.Store(0)
e.metrics.ClearErrorCounter(shID)
}
return sh.SetMode(ctx, m)
}
}
return errShardNotFound
}
// HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
select {
case <-ctx.Done():
return
case sh.NotificationChannel() <- epoch:
default:
e.log.Debug(ctx, logs.ShardEventProcessingInProgress,
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))
}
}
}
func (e *StorageEngine) DetachShards(ctx context.Context, ids []*shard.ID) error {
if len(ids) == 0 {
return logicerr.New("ids must be non-empty")
}
deletedShards, err := e.deleteShards(ctx, ids)
if err != nil {
return err
}
return e.closeShards(ctx, deletedShards)
}
// closeShards closes deleted shards. Tries to close all shards.
// Returns single error with joined shard errors.
func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedShard) error {
var multiErr error
var multiErrGuard sync.Mutex
var eg errgroup.Group
for _, sh := range deletedShards {
eg.Go(func() error {
err := sh.SetMode(ctx, mode.Disabled)
if err != nil {
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
multiErrGuard.Lock()
multiErr = errors.Join(multiErr, fmt.Errorf("change shard (id:%s) mode to disabled: %w", sh.ID(), err))
multiErrGuard.Unlock()
}
err = sh.Close(ctx)
if err != nil {
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
multiErrGuard.Lock()
multiErr = errors.Join(multiErr, fmt.Errorf("close removed shard (id:%s): %w", sh.ID(), err))
multiErrGuard.Unlock()
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
return multiErr
}
// deleteShards deletes shards with specified ids from engine shard list
// and releases all engine resources associated with shards.
// Returns deleted shards or error if some shard could not be deleted.
func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]hashedShard, error) {
ss := make([]hashedShard, 0, len(ids))
e.mtx.Lock()
defer e.mtx.Unlock()
for _, id := range ids {
idStr := id.String()
sh, found := e.shards[idStr]
if !found {
return nil, errShardNotFound
}
ss = append(ss, sh)
}
if len(ss) == len(e.shards) {
return nil, logicerr.New("could not delete all the shards")
}
for _, sh := range ss {
idStr := sh.ID().String()
e.metrics.DeleteShardMetrics(idStr)
delete(e.shards, idStr)
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
zap.String("id", idStr))
}
return ss, nil
}
func (s hashedShard) Hash() uint64 {
return s.hash
}
func (e *StorageEngine) ListShardsForObject(ctx context.Context, obj oid.Address) ([]shard.Info, error) {
var err error
var info []shard.Info
prm := shard.ExistsPrm{
Address: obj,
}
var siErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if itErr := e.iterateOverUnsortedShards(ctx, func(hs hashedShard) (stop bool) {
res, exErr := hs.Exists(ctx, prm)
if exErr != nil {
if client.IsErrObjectAlreadyRemoved(exErr) {
err = new(apistatus.ObjectAlreadyRemoved)
return true
}
// Check if error is either SplitInfoError or ECInfoError.
// True means the object is virtual.
if errors.As(exErr, &siErr) || errors.As(exErr, &ecErr) {
info = append(info, hs.DumpInfo())
return false
}
if shard.IsErrObjectExpired(exErr) {
err = exErr
return true
}
if !client.IsErrObjectNotFound(exErr) {
e.reportShardError(ctx, hs, "could not check existence of object in shard", exErr, zap.Stringer("address", prm.Address))
}
return false
}
if res.Exists() {
info = append(info, hs.DumpInfo())
}
return false
}); itErr != nil {
return nil, itErr
}
return info, err
}