[#1375] node: Configure of the container cache size
All checks were successful
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m13s
Tests and linters / Run gofumpt (pull_request) Successful in 3m12s
DCO action / DCO (pull_request) Successful in 3m38s
Build / Build Components (pull_request) Successful in 3m35s
Vulncheck / Vulncheck (pull_request) Successful in 3m33s
Tests and linters / Staticcheck (pull_request) Successful in 5m33s
Tests and linters / gopls check (pull_request) Successful in 5m44s
Tests and linters / Lint (pull_request) Successful in 6m1s
Tests and linters / Tests (pull_request) Successful in 6m52s
Tests and linters / Tests with -race (pull_request) Successful in 7m46s

Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
Alexander Chuprov 2024-09-27 12:39:43 +03:00
parent a5e1aa22c9
commit 81c63a8cf2
6 changed files with 55 additions and 35 deletions

View file

@ -165,13 +165,11 @@ type ttlContainerStorage struct {
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo] delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
} }
func newCachedContainerStorage(v container.Source, ttl time.Duration) ttlContainerStorage { func newCachedContainerStorage(v container.Source, ttl time.Duration, containerCacheSize uint32) ttlContainerStorage {
const containerCacheSize = 100 lruCnrCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(id cid.ID) (*container.Container, error) {
lruCnrCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.Container, error) {
return v.Get(id) return v.Get(id)
}, metrics.NewCacheMetrics("container")) }, metrics.NewCacheMetrics("container"))
lruDelInfoCache := newNetworkTTLCache(containerCacheSize, ttl, func(id cid.ID) (*container.DelInfo, error) { lruDelInfoCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(id cid.ID) (*container.DelInfo, error) {
return v.DeletionInfo(id) return v.DeletionInfo(id)
}, metrics.NewCacheMetrics("container_deletion_info")) }, metrics.NewCacheMetrics("container_deletion_info"))

View file

@ -570,6 +570,8 @@ type cfgMorph struct {
// TTL of Sidechain cached values. Non-positive value disables caching. // TTL of Sidechain cached values. Non-positive value disables caching.
cacheTTL time.Duration cacheTTL time.Duration
containerCacheSize uint32
proxyScriptHash neogoutil.Uint160 proxyScriptHash neogoutil.Uint160
} }

View file

@ -30,6 +30,9 @@ const (
// FrostfsIDCacheSizeDefault is a default value of APE chain cache. // FrostfsIDCacheSizeDefault is a default value of APE chain cache.
FrostfsIDCacheSizeDefault = 10_000 FrostfsIDCacheSizeDefault = 10_000
// ContainerCacheSizeDefault represents the default size for the container cache.
ContainerCacheSizeDefault = 100
) )
var errNoMorphEndpoints = errors.New("no morph chain RPC endpoints, see `morph.rpc_endpoint` section") var errNoMorphEndpoints = errors.New("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")
@ -103,6 +106,18 @@ func CacheTTL(c *config.Config) time.Duration {
return CacheTTLDefault return CacheTTLDefault
} }
// ContainerCacheSize returns the value of "container_cache_size" config parameter
// from "morph" section.
//
// Returns 0 if the value is not positive integer.
// Returns ContainerCacheSizeDefault if the value is missing.
func ContainerCacheSize(c *config.Config) uint32 {
if c.Sub(subsection).Value("container_cache_size") == nil {
return ContainerCacheSizeDefault
}
return config.Uint32Safe(c.Sub(subsection), "container_cache_size")
}
// SwitchInterval returns the value of "switch_interval" config parameter // SwitchInterval returns the value of "switch_interval" config parameter
// from "morph" section. // from "morph" section.
// //

View file

@ -87,43 +87,46 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
cnrRdr.lister = client cnrRdr.lister = client
} else { } else {
// use RPC node as source of Container contract items (with caching) // use RPC node as source of Container contract items (with caching)
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL) c.cfgObject.cnrSource = cnrSrc
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL) if c.cfgMorph.containerCacheSize > 0 {
containerCache := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL, c.cfgMorph.containerCacheSize)
subscribeToContainerCreation(c, func(e event.Event) { subscribeToContainerCreation(c, func(e event.Event) {
ev := e.(containerEvent.PutSuccess) ev := e.(containerEvent.PutSuccess)
// read owner of the created container in order to update the reading cache. // read owner of the created container in order to update the reading cache.
// TODO: use owner directly from the event after neofs-contract#256 will become resolved // TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it: // but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op. // creation success are most commonly tracked by polling GET op.
cnr, err := cnrSrc.Get(ev.ID) cnr, err := cnrSrc.Get(ev.ID)
if err == nil { if err == nil {
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil) containerCache.containerCache.set(ev.ID, cnr, nil)
} else { } else {
// unlike removal, we expect successful receive of the container // unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful // after successful creation, so logging can be useful
c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification, c.log.Error(logs.FrostFSNodeReadNewlyCreatedContainerAfterTheNotification,
zap.Stringer("id", ev.ID),
zap.Error(err),
)
}
c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt,
zap.Stringer("id", ev.ID), zap.Stringer("id", ev.ID),
zap.Error(err),
) )
} })
c.log.Debug(logs.FrostFSNodeContainerCreationEventsReceipt, subscribeToContainerRemoval(c, func(e event.Event) {
zap.Stringer("id", ev.ID), ev := e.(containerEvent.DeleteSuccess)
) containerCache.handleRemoval(ev.ID)
}) c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
zap.Stringer("id", ev.ID),
subscribeToContainerRemoval(c, func(e event.Event) { )
ev := e.(containerEvent.DeleteSuccess) })
cachedContainerStorage.handleRemoval(ev.ID) c.cfgObject.cnrSource = containerCache
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt, }
zap.Stringer("id", ev.ID),
)
})
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
c.cfgObject.eaclSource = cachedEACLStorage c.cfgObject.eaclSource = cachedEACLStorage
c.cfgObject.cnrSource = cachedContainerStorage
cnrRdr.lister = client cnrRdr.lister = client
cnrRdr.eacl = c.cfgObject.eaclSource cnrRdr.eacl = c.cfgObject.eaclSource

View file

@ -90,6 +90,7 @@ func initMorphComponents(ctx context.Context, c *cfg) {
var netmapSource netmap.Source var netmapSource netmap.Source
c.cfgMorph.containerCacheSize = morphconfig.ContainerCacheSize(c.appCfg)
c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg) c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg)
if c.cfgMorph.cacheTTL == 0 { if c.cfgMorph.cacheTTL == 0 {

View file

@ -81,6 +81,7 @@ morph:
cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls). Negative value disables caching. cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls). Negative value disables caching.
# Default value: block time. It is recommended to have this value less or equal to block time. # Default value: block time. It is recommended to have this value less or equal to block time.
# Cached entities: containers, container lists, eACL tables. # Cached entities: containers, container lists, eACL tables.
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
- address: wss://rpc1.morph.frostfs.info:40341/ws - address: wss://rpc1.morph.frostfs.info:40341/ws