forked from TrueCloudLab/frostfs-node
[#1677] writecache: Add QoS limiter usage
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
7893d763d1
commit
07a660fbc4
5 changed files with 27 additions and 1 deletions
|
@ -930,6 +930,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
|
||||||
writecache.WithMaxCacheCount(wcRead.countLimit),
|
writecache.WithMaxCacheCount(wcRead.countLimit),
|
||||||
writecache.WithNoSync(wcRead.noSync),
|
writecache.WithNoSync(wcRead.noSync),
|
||||||
writecache.WithLogger(c.log),
|
writecache.WithLogger(c.log),
|
||||||
|
writecache.WithQoSLimiter(shCfg.limiter),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return writeCacheOpts
|
return writeCacheOpts
|
||||||
|
|
|
@ -514,4 +514,5 @@ const (
|
||||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||||
|
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -61,6 +62,7 @@ func New(opts ...Option) Cache {
|
||||||
maxCacheSize: defaultMaxCacheSize,
|
maxCacheSize: defaultMaxCacheSize,
|
||||||
metrics: DefaultMetrics(),
|
metrics: DefaultMetrics(),
|
||||||
flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize,
|
flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize,
|
||||||
|
qosLimiter: qos.NewNoopLimiter(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,13 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
|
release, err := c.qosLimiter.ReadRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.IterateInfo"), zap.Error(err))
|
||||||
|
c.modeMtx.RUnlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
|
||||||
if err := fl.acquire(oi.DataSize); err != nil {
|
if err := fl.acquire(oi.DataSize); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -82,6 +88,7 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err))
|
c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -113,6 +120,12 @@ func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) {
|
||||||
func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
|
func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
|
||||||
defer fl.release(objInfo.size)
|
defer fl.release(objInfo.size)
|
||||||
|
|
||||||
|
release, err := c.qosLimiter.WriteRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.Get"), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
res, err := c.fsTree.Get(ctx, common.GetPrm{
|
res, err := c.fsTree.Get(ctx, common.GetPrm{
|
||||||
Address: objInfo.addr,
|
Address: objInfo.addr,
|
||||||
})
|
})
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -38,6 +39,8 @@ type options struct {
|
||||||
disableBackgroundFlush bool
|
disableBackgroundFlush bool
|
||||||
// flushSizeLimit is total size of flushing objects.
|
// flushSizeLimit is total size of flushing objects.
|
||||||
flushSizeLimit uint64
|
flushSizeLimit uint64
|
||||||
|
// qosLimiter used to limit flush RPS.
|
||||||
|
qosLimiter qos.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger sets logger.
|
// WithLogger sets logger.
|
||||||
|
@ -136,3 +139,9 @@ func WithFlushSizeLimit(v uint64) Option {
|
||||||
o.flushSizeLimit = v
|
o.flushSizeLimit = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithQoSLimiter(l qos.Limiter) Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.qosLimiter = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue