[#9999] writecache: Add QoS limiter usage
All checks were successful
DCO action / DCO (pull_request) Successful in 27s
Vulncheck / Vulncheck (pull_request) Successful in 1m7s
Build / Build Components (pull_request) Successful in 1m30s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Tests and linters / Lint (pull_request) Successful in 3m24s
Tests and linters / Run gofumpt (pull_request) Successful in 3m17s
Tests and linters / Staticcheck (pull_request) Successful in 3m34s
Tests and linters / Tests (pull_request) Successful in 3m36s
Tests and linters / gopls check (pull_request) Successful in 4m10s
Tests and linters / Tests with -race (pull_request) Successful in 4m29s
All checks were successful
DCO action / DCO (pull_request) Successful in 27s
Vulncheck / Vulncheck (pull_request) Successful in 1m7s
Build / Build Components (pull_request) Successful in 1m30s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Tests and linters / Lint (pull_request) Successful in 3m24s
Tests and linters / Run gofumpt (pull_request) Successful in 3m17s
Tests and linters / Staticcheck (pull_request) Successful in 3m34s
Tests and linters / Tests (pull_request) Successful in 3m36s
Tests and linters / gopls check (pull_request) Successful in 4m10s
Tests and linters / Tests with -race (pull_request) Successful in 4m29s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
997759994a
commit
60ccd588dd
6 changed files with 45 additions and 3 deletions
|
@ -930,6 +930,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
|
|||
writecache.WithMaxCacheCount(wcRead.countLimit),
|
||||
writecache.WithNoSync(wcRead.noSync),
|
||||
writecache.WithLogger(c.log),
|
||||
writecache.WithQoSLimiter(shCfg.limiter),
|
||||
)
|
||||
}
|
||||
return writeCacheOpts
|
||||
|
|
|
@ -514,4 +514,5 @@ const (
|
|||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
||||
)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -61,6 +62,7 @@ func New(opts ...Option) Cache {
|
|||
maxCacheSize: defaultMaxCacheSize,
|
||||
metrics: DefaultMetrics(),
|
||||
flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize,
|
||||
qosLimiter: qos.NewNoopLimiter(),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -67,7 +67,13 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
|
|||
continue
|
||||
}
|
||||
|
||||
err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
|
||||
release, err := c.qosLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.IterateInfo"), zap.Error(err))
|
||||
c.modeMtx.RUnlock()
|
||||
continue
|
||||
}
|
||||
err = c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
|
||||
if err := fl.acquire(oi.DataSize); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -82,6 +88,7 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
|
|||
return ctx.Err()
|
||||
}
|
||||
})
|
||||
release()
|
||||
if err != nil {
|
||||
c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err))
|
||||
}
|
||||
|
@ -113,9 +120,15 @@ func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) {
|
|||
func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
|
||||
defer fl.release(objInfo.size)
|
||||
|
||||
release, err := c.qosLimiter.ReadRequest(ctx)
|
||||
if err != nil {
|
||||
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.Get"), zap.Error(err))
|
||||
return
|
||||
}
|
||||
res, err := c.fsTree.Get(ctx, common.GetPrm{
|
||||
Address: objInfo.addr,
|
||||
})
|
||||
release()
|
||||
if err != nil {
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
c.reportFlushError(ctx, logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err))
|
||||
|
@ -185,7 +198,13 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
|
|||
prm.Object = obj
|
||||
prm.RawData = data
|
||||
|
||||
release, err := c.qosLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "blobstor.Put"), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
res, err := c.blobstor.Put(ctx, prm)
|
||||
release()
|
||||
if err != nil {
|
||||
if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) &&
|
||||
!errors.Is(err, blobstor.ErrNoPlaceFound) {
|
||||
|
@ -198,8 +217,13 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
|
|||
var updPrm meta.UpdateStorageIDPrm
|
||||
updPrm.SetAddress(addr)
|
||||
updPrm.SetStorageID(res.StorageID)
|
||||
|
||||
release, err = c.qosLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "metabase.UpdateStorageID"), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
_, err = c.metabase.UpdateStorageID(ctx, updPrm)
|
||||
release()
|
||||
if err != nil {
|
||||
c.reportFlushError(ctx, logs.FSTreeCantUpdateID,
|
||||
addr.EncodeToString(), err)
|
||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -38,6 +39,8 @@ type options struct {
|
|||
disableBackgroundFlush bool
|
||||
// flushSizeLimit is total size of flushing objects.
|
||||
flushSizeLimit uint64
|
||||
// qosLimiter used to limit flush RPS.
|
||||
qosLimiter qos.Limiter
|
||||
}
|
||||
|
||||
// WithLogger sets logger.
|
||||
|
@ -136,3 +139,9 @@ func WithFlushSizeLimit(v uint64) Option {
|
|||
o.flushSizeLimit = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithQoSLimiter(l qos.Limiter) Option {
|
||||
return func(o *options) {
|
||||
o.qosLimiter = l
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,12 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
|||
}
|
||||
|
||||
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address, size uint64) {
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size})
|
||||
release, err := c.qosLimiter.WriteRequest(ctx)
|
||||
if err != nil {
|
||||
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.Delete"), zap.Error(err))
|
||||
}
|
||||
_, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr, Size: size})
|
||||
release()
|
||||
if err != nil && !client.IsErrObjectNotFound(err) {
|
||||
c.log.Error(ctx, logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||
} else if err == nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue