package policer import ( "context" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) // KeySpaceIterator is the interface that allows iterating over the key space // of local storage. // Note that the underlying implementation might be circular: i.e. it can restart // when the end of the key space is reached. type KeySpaceIterator interface { Next(context.Context, uint32) ([]objectcore.AddressWithType, error) Rewind() } // RedundantCopyCallback is a callback to pass // the redundant local copy of the object. type RedundantCopyCallback func(context.Context, oid.Address) // BuryFunc is the function to bury (i.e. inhume) an object. type BuryFunc func(context.Context, oid.Address) error // Replicator is the interface to a consumer of replication tasks. type Replicator interface { HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) } // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) type cfg struct { headTimeout time.Duration log *logger.Logger keySpaceIterator KeySpaceIterator buryFn BuryFunc cnrSrc container.Source placementBuilder placement.Builder remoteHeader RemoteObjectHeaderFunc netmapKeys netmap.AnnouncedKeys replicator Replicator cbRedundantCopy RedundantCopyCallback taskPool *ants.Pool maxCapacity int batchSize, cacheSize uint32 rebalanceFreq, evictDuration, sleepDuration time.Duration } func defaultCfg() *cfg { return &cfg{ log: &logger.Logger{Logger: zap.L()}, batchSize: 10, cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB rebalanceFreq: 1 * time.Second, sleepDuration: 1 * time.Second, evictDuration: 30 * time.Second, } } // Option is an option for Policer constructor. type Option func(*cfg) // WithHeadTimeout returns option to set Head timeout of Policer. func WithHeadTimeout(v time.Duration) Option { return func(c *cfg) { c.headTimeout = v } } // WithLogger returns option to set Logger of Policer. func WithLogger(v *logger.Logger) Option { return func(c *cfg) { c.log = v } } func WithKeySpaceIterator(it KeySpaceIterator) Option { return func(c *cfg) { c.keySpaceIterator = it } } func WithBuryFunc(f BuryFunc) Option { return func(c *cfg) { c.buryFn = f } } // WithContainerSource returns option to set container source of Policer. func WithContainerSource(v container.Source) Option { return func(c *cfg) { c.cnrSrc = v } } // WithPlacementBuilder returns option to set object placement builder of Policer. func WithPlacementBuilder(v placement.Builder) Option { return func(c *cfg) { c.placementBuilder = v } } // WithRemoteObjectHeader returns option to set object header receiver of Policer. func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option { return func(c *cfg) { c.remoteHeader = v } } // WithNetmapKeys returns option to set tool to work with announced public keys. func WithNetmapKeys(v netmap.AnnouncedKeys) Option { return func(c *cfg) { c.netmapKeys = v } } // WithReplicator returns option to set object replicator of Policer. func WithReplicator(v Replicator) Option { return func(c *cfg) { c.replicator = v } } // WithRedundantCopyCallback returns option to set // callback to pass redundant local object copies // detected by Policer. func WithRedundantCopyCallback(cb RedundantCopyCallback) Option { return func(c *cfg) { c.cbRedundantCopy = cb } } // WithMaxCapacity returns option to set max capacity // that can be set to the pool. func WithMaxCapacity(capacity int) Option { return func(c *cfg) { c.maxCapacity = capacity } } // WithPool returns option to set pool for // policy and replication operations. func WithPool(p *ants.Pool) Option { return func(c *cfg) { c.taskPool = p } }