frostfs-node/pkg/services/policer/option.go

215 lines
5.6 KiB
Go
Raw Normal View History

package policer
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// KeySpaceIterator is the interface that allows iterating over the key space
// of local storage.
// Note that the underlying implementation might be circular: i.e. it can restart
// when the end of the key space is reached.
type KeySpaceIterator interface {
Next(context.Context, uint32) ([]objectcore.Info, error)
Rewind()
}
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(context.Context, oid.Address)
// BuryFunc is the function to bury (i.e. inhume) an object.
type BuryFunc func(context.Context, oid.Address) error
// Replicator is the interface to a consumer of replication tasks.
type Replicator interface {
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
HandlePullTask(ctx context.Context, task replicator.Task)
HandleLocalPutTask(ctx context.Context, task replicator.Task)
}
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type cfg struct {
headTimeout time.Duration
log *logger.Logger
keySpaceIterator KeySpaceIterator
buryFn BuryFunc
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader RemoteObjectHeaderFunc
localHeader LocalObjectHeaderFunc
netmapKeys netmap.AnnouncedKeys
replicator Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
batchSize, cacheSize uint32
evictDuration, sleepDuration time.Duration
metrics MetricsRegister
remoteObject RemoteObjectGetFunc
localObject LocalObjectGetFunc
keyStorage *util.KeyStorage
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
sleepDuration: 1 * time.Second,
evictDuration: 30 * time.Second,
metrics: noopMetrics{},
}
}
// Option is an option for Policer constructor.
type Option func(*cfg)
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *logger.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
func WithKeySpaceIterator(it KeySpaceIterator) Option {
return func(c *cfg) {
c.keySpaceIterator = it
}
}
func WithBuryFunc(f BuryFunc) Option {
return func(c *cfg) {
c.buryFn = f
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteObjectHeaderFunc returns option to set remote object header receiver of Policer.
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
return func(c *cfg) {
c.localHeader = v
}
}
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
return func(c *cfg) {
c.remoteObject = v
}
}
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
return func(c *cfg) {
c.localObject = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}
// WithMetrics returns option to set metrics.
func WithMetrics(m MetricsRegister) Option {
return func(c *cfg) {
c.metrics = m
}
}
func WithKeyStorage(ks *util.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = ks
}
}