[#9999] shard: Add limiter usage
All checks were successful
DCO action / DCO (pull_request) Successful in 28s
Vulncheck / Vulncheck (pull_request) Successful in 55s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m29s
Build / Build Components (pull_request) Successful in 1m36s
Tests and linters / Run gofumpt (pull_request) Successful in 2m52s
Tests and linters / Tests (pull_request) Successful in 3m2s
Tests and linters / gopls check (pull_request) Successful in 2m59s
Tests and linters / Staticcheck (pull_request) Successful in 3m24s
Tests and linters / Lint (pull_request) Successful in 3m29s
Tests and linters / Tests with -race (pull_request) Successful in 4m48s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-05 15:57:27 +03:00
parent 2c61168d3e
commit f383ab1fe6
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
22 changed files with 391 additions and 67 deletions

View file

@ -1072,6 +1072,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
return pool
}),
shard.WithLimiter(shCfg.limiter),
}
return sh
}

View file

@ -78,6 +78,10 @@ var (
noopLimiterInstance = &noopLimiter{}
)
func NewNoopLimiter() Limiter {
return &noopLimiter{}
}
type noopLimiter struct{}
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {

View file

@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr)
csRes, err := sh.Shard.ContainerSize(csPrm)
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
if err != nil {
e.reportShardError(ctx, sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr))

View file

@ -339,7 +339,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
var drop []cid.ID
for id := range idMap {
prm.SetContainerID(id)
s, err := sh.ContainerSize(prm)
s, err := sh.ContainerSize(ctx, prm)
if err != nil {
e.log.Warn(ctx, logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true

View file

@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 {
return r.size
}
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) {
s.m.RLock()
defer s.m.RUnlock()
@ -34,6 +34,12 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
return ContainerSizeRes{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return ContainerSizeRes{}, err
}
defer release()
size, err := s.metaBase.ContainerSize(prm.cnr)
if err != nil {
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
@ -69,6 +75,12 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
return ContainerCountRes{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return ContainerCountRes{}, err
}
defer release()
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
if err != nil {
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
@ -100,6 +112,12 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.metaBase.DeleteContainerSize(ctx, id)
}
@ -122,5 +140,11 @@ func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.metaBase.DeleteContainerCount(ctx, id)
}

View file

@ -23,6 +23,12 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
return 0, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
cc, err := s.metaBase.ObjectCounters()
if err != nil {
return 0, err

View file

@ -54,6 +54,12 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
return DeleteRes{}, ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return DeleteRes{}, err
}
defer release()
result := DeleteRes{}
for _, addr := range prm.addr {
select {

View file

@ -53,10 +53,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
))
defer span.End()
var exists bool
var locked bool
var err error
s.m.RLock()
defer s.m.RUnlock()
@ -64,7 +60,18 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() {
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return ExistsRes{}, err
}
defer release()
var exists bool
var locked bool
if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.Address

View file

@ -289,28 +289,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
s.log.Debug(ctx, logs.ShardGCRemoveGarbageStarted)
defer s.log.Debug(ctx, logs.ShardGCRemoveGarbageCompleted)
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(ctx, iterPrm)
buf, err := s.getGarbage(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardIteratorOverMetabaseGraveyardFailed,
zap.Error(err),
@ -342,6 +321,39 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
return
}
func (s *Shard) getGarbage(ctx context.Context) ([]oid.Address, error) {
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
if err := s.metaBase.IterateOverGarbage(ctx, iterPrm); err != nil {
return nil, err
}
return buf, nil
}
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
workerCount = max(minExpiredWorkers, s.gc.gcCfg.expiredCollectorWorkerCount)
batchSize = max(minExpiredBatchSize, s.gc.gcCfg.expiredCollectorBatchSize)
@ -420,18 +432,9 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
inhumePrm.SetGCMark()
// inhume the collected objects
res, err := s.metaBase.Inhume(ctx, inhumePrm)
res, err := s.inhumeGC(ctx, expired)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err))
return
}
@ -449,6 +452,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
}
func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) {
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(ctx, source)
if err != nil {
@ -462,6 +471,19 @@ func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address)
return result, nil
}
func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeRes, error) {
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return meta.InhumeRes{}, err
}
defer release()
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(addrs...)
inhumePrm.SetGCMark()
return s.metaBase.Inhume(ctx, inhumePrm)
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
@ -503,11 +525,17 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return
}
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
return
}
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
release()
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
return
}
@ -596,7 +624,13 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo
return ErrDegradedMode
}
err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
err = s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
@ -619,6 +653,12 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
return nil, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.metaBase.FilterExpired(ctx, epoch, addresses)
}
@ -634,12 +674,15 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return
}
res, err := s.metaBase.InhumeTombstones(ctx, tss)
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
return
}
res, err := s.metaBase.InhumeTombstones(ctx, tss)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
return
}
@ -662,11 +705,16 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
if s.GetMode().NoMetabase() {
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
@ -674,13 +722,15 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
var pInhume meta.InhumePrm
pInhume.SetAddresses(lockers...)
pInhume.SetForceGCMark()
res, err := s.metaBase.Inhume(ctx, pInhume)
release, err = s.limiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
return
}
res, err := s.metaBase.Inhume(ctx, pInhume)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
return
}
@ -719,12 +769,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
return
}
_, err := s.metaBase.FreeLockedBy(lockers)
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
)
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
_, err = s.metaBase.FreeLockedBy(lockers)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
}
@ -748,7 +801,13 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
}
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
ids, err := s.metaBase.ZeroSizeContainers(ctx)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
@ -760,7 +819,13 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui
}
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
ids, err := s.metaBase.ZeroCountContainers(ctx)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return

View file

@ -111,6 +111,12 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return c.Get(ctx, prm.addr)
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return GetRes{}, err
}
defer release()
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)

View file

@ -81,6 +81,12 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw)
release, limitErr := s.limiter.ReadRequest(ctx)
if limitErr != nil {
return HeadRes{}, limitErr
}
defer release()
var res meta.GetRes
res, err = s.metaBase.Get(ctx, headParams)
obj = res.Header()

View file

@ -32,6 +32,11 @@ func (s *Shard) ID() *ID {
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID(ctx context.Context) (err error) {
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
var idFromMetabase []byte
modeDegraded := s.GetMode().NoMetabase()
if !modeDegraded {

View file

@ -81,6 +81,12 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return InhumeRes{}, ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return InhumeRes{}, err
}
defer release()
if s.hasWriteCache() {
for i := range prm.target {
_ = s.writeCache.Delete(ctx, prm.target[i])

View file

@ -106,6 +106,12 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
return SelectRes{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return SelectRes{}, err
}
defer release()
lst, err := s.metaBase.Containers(ctx)
if err != nil {
return res, fmt.Errorf("list stored containers: %w", err)
@ -145,6 +151,12 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
return ListContainersRes{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return ListContainersRes{}, err
}
defer release()
containers, err := s.metaBase.Containers(ctx)
if err != nil {
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
@ -173,6 +185,12 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
return ListWithCursorRes{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return ListWithCursorRes{}, err
}
defer release()
var metaPrm meta.ListPrm
metaPrm.SetCount(prm.count)
metaPrm.SetCursor(prm.cursor)
@ -202,9 +220,15 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
return ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
err = s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over containers: %w", err)
}
@ -227,11 +251,17 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
err = s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over objects: %w", err)
}
@ -251,6 +281,12 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
return 0, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
var metaPrm meta.CountAliveObjectsInContainerPrm
metaPrm.ObjectType = prm.ObjectType
metaPrm.ContainerID = prm.ContainerID

View file

@ -38,7 +38,13 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
return ErrDegradedMode
}
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
err = s.metaBase.Lock(ctx, idCnr, locker, locked)
if err != nil {
return fmt.Errorf("metabase lock: %w", err)
}
@ -61,6 +67,12 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return false, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return false, err
}
defer release()
var prm meta.IsLockedPrm
prm.SetAddress(addr)
@ -86,5 +98,12 @@ func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error
if m.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.metaBase.GetLocks(ctx, addr)
}

View file

@ -67,6 +67,12 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
var res common.PutRes
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return PutRes{}, err
}
defer release()
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()

View file

@ -131,6 +131,12 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
return obj, nil
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return RngRes{}, err
}
defer release()
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)

View file

@ -103,6 +103,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
default:
}
log.Info(ctx, logs.BlobstoreRebuildStarted)
// TODO use shard limiter
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else {

View file

@ -60,6 +60,12 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
return SelectRes{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return SelectRes{}, nil
}
defer release()
var selectPrm meta.SelectPrm
selectPrm.SetFilters(prm.filters)
selectPrm.SetContainerID(prm.cnr)

View file

@ -7,6 +7,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -98,6 +99,8 @@ type cfg struct {
reportErrorFunc func(ctx context.Context, selfID string, message string, err error)
containerInfo container.InfoProvider
limiter qos.Limiter
}
func defaultCfg() *cfg {
@ -109,6 +112,7 @@ func defaultCfg() *cfg {
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
metricsWriter: noopMetrics{},
limiter: qos.NewNoopLimiter(),
}
}
@ -368,6 +372,12 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
}
}
func WithLimiter(l qos.Limiter) Option {
return func(c *cfg) {
c.limiter = l
}
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -43,6 +43,11 @@ func (s *Shard) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID s
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeMove(ctx, d, treeID, m)
}
@ -75,6 +80,11 @@ func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, tre
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeAddByPath(ctx, d, treeID, attr, path, meta)
}
@ -103,6 +113,11 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
}
@ -130,6 +145,11 @@ func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
}
@ -157,6 +177,11 @@ func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string,
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
}
@ -182,6 +207,11 @@ func (s *Shard) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, n
if s.info.Mode.NoMetabase() {
return pilorama.Meta{}, 0, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return pilorama.Meta{}, 0, err
}
defer release()
return s.pilorama.TreeGetMeta(ctx, cid, treeID, nodeID)
}
@ -207,6 +237,11 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID)
}
@ -231,6 +266,11 @@ func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID
if s.info.Mode.NoMetabase() {
return nil, last, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, last, err
}
defer release()
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
}
@ -256,6 +296,11 @@ func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string,
if s.info.Mode.NoMetabase() {
return pilorama.Move{}, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return pilorama.Move{}, err
}
defer release()
return s.pilorama.TreeGetOpLog(ctx, cid, treeID, height)
}
@ -280,6 +325,11 @@ func (s *Shard) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) erro
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeDrop(ctx, cid, treeID)
}
@ -303,6 +353,11 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeList(ctx, cid)
}
@ -326,6 +381,11 @@ func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (u
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
return s.pilorama.TreeHeight(ctx, cid, treeID)
}
@ -350,6 +410,11 @@ func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (b
if s.info.Mode.NoMetabase() {
return false, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return false, err
}
defer release()
return s.pilorama.TreeExists(ctx, cid, treeID)
}
@ -378,6 +443,11 @@ func (s *Shard) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, tre
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeUpdateLastSyncHeight(ctx, cid, treeID, height)
}
@ -402,6 +472,11 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
}
@ -423,6 +498,11 @@ func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.limiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeListTrees(ctx, prm)
}
@ -452,5 +532,10 @@ func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID strin
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}

View file

@ -67,6 +67,12 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
}
@ -124,6 +130,13 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
close(started)
defer cleanup()
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
return
}
defer release()
s.log.Info(ctx, logs.StartedWritecacheSealAsync)
if err := s.writeCache.Seal(ctx, prm); err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
@ -138,5 +151,11 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
return nil
}
}
release, err := s.limiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.writeCache.Seal(ctx, prm)
}