forked from TrueCloudLab/frostfs-node
[#811] cmd/node: Use cache invalidator in container service
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
4a1f0de8f4
commit
49b6b5b49d
2 changed files with 116 additions and 6 deletions
|
@ -82,6 +82,20 @@ func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ttlNetCache) remove(key interface{}) {
|
||||||
|
c.mtx.Lock()
|
||||||
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
c.cache.Remove(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ttlNetCache) keys() []interface{} {
|
||||||
|
c.mtx.Lock()
|
||||||
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
return c.cache.Keys()
|
||||||
|
}
|
||||||
|
|
||||||
// entity that provides LRU cache interface.
|
// entity that provides LRU cache interface.
|
||||||
type lruNetCache struct {
|
type lruNetCache struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
@ -130,7 +144,7 @@ func (c *lruNetCache) get(key interface{}) (interface{}, error) {
|
||||||
// that implements container storage.
|
// that implements container storage.
|
||||||
type ttlContainerStorage ttlNetCache
|
type ttlContainerStorage ttlNetCache
|
||||||
|
|
||||||
func newCachedContainerStorage(v container.Source) container.Source {
|
func newCachedContainerStorage(v container.Source) *ttlContainerStorage {
|
||||||
const (
|
const (
|
||||||
containerCacheSize = 100
|
containerCacheSize = 100
|
||||||
containerCacheTTL = 30 * time.Second
|
containerCacheTTL = 30 * time.Second
|
||||||
|
@ -150,6 +164,8 @@ func newCachedContainerStorage(v container.Source) container.Source {
|
||||||
return (*ttlContainerStorage)(lruCnrCache)
|
return (*ttlContainerStorage)(lruCnrCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns container value from the cache. If value is missing in the cache
|
||||||
|
// or expired, then it returns value from side chain and updates the cache.
|
||||||
func (s *ttlContainerStorage) Get(cid *cid.ID) (*containerSDK.Container, error) {
|
func (s *ttlContainerStorage) Get(cid *cid.ID) (*containerSDK.Container, error) {
|
||||||
val, err := (*ttlNetCache)(s).get(cid.String())
|
val, err := (*ttlNetCache)(s).get(cid.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -159,9 +175,14 @@ func (s *ttlContainerStorage) Get(cid *cid.ID) (*containerSDK.Container, error)
|
||||||
return val.(*containerSDK.Container), nil
|
return val.(*containerSDK.Container), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InvalidateContainer removes cached container value.
|
||||||
|
func (s *ttlContainerStorage) InvalidateContainer(cid *cid.ID) {
|
||||||
|
(*ttlNetCache)(s).remove(cid.String())
|
||||||
|
}
|
||||||
|
|
||||||
type ttlEACLStorage ttlNetCache
|
type ttlEACLStorage ttlNetCache
|
||||||
|
|
||||||
func newCachedEACLStorage(v eacl.Source) eacl.Source {
|
func newCachedEACLStorage(v eacl.Source) *ttlEACLStorage {
|
||||||
const (
|
const (
|
||||||
eaclCacheSize = 100
|
eaclCacheSize = 100
|
||||||
eaclCacheTTL = 30 * time.Second
|
eaclCacheTTL = 30 * time.Second
|
||||||
|
@ -181,6 +202,8 @@ func newCachedEACLStorage(v eacl.Source) eacl.Source {
|
||||||
return (*ttlEACLStorage)(lruCnrCache)
|
return (*ttlEACLStorage)(lruCnrCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetEACL returns eACL value from the cache. If value is missing in the cache
|
||||||
|
// or expired, then it returns value from side chain and updates cache.
|
||||||
func (s *ttlEACLStorage) GetEACL(cid *cid.ID) (*eaclSDK.Table, error) {
|
func (s *ttlEACLStorage) GetEACL(cid *cid.ID) (*eaclSDK.Table, error) {
|
||||||
val, err := (*ttlNetCache)(s).get(cid.String())
|
val, err := (*ttlNetCache)(s).get(cid.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -190,6 +213,11 @@ func (s *ttlEACLStorage) GetEACL(cid *cid.ID) (*eaclSDK.Table, error) {
|
||||||
return val.(*eaclSDK.Table), nil
|
return val.(*eaclSDK.Table), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InvalidateEACL removes cached eACL value.
|
||||||
|
func (s *ttlEACLStorage) InvalidateEACL(cid *cid.ID) {
|
||||||
|
(*ttlNetCache)(s).remove(cid.String())
|
||||||
|
}
|
||||||
|
|
||||||
type lruNetmapSource struct {
|
type lruNetmapSource struct {
|
||||||
netState netmap.State
|
netState netmap.State
|
||||||
|
|
||||||
|
@ -261,6 +289,9 @@ func newCachedContainerLister(w *wrapper.Wrapper) *ttlContainerLister {
|
||||||
return (*ttlContainerLister)(lruCnrListerCache)
|
return (*ttlContainerLister)(lruCnrListerCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List returns list of container IDs from the cache. If list is missing in the
|
||||||
|
// cache or expired, then it returns container IDs from side chain and updates
|
||||||
|
// the cache.
|
||||||
func (s *ttlContainerLister) List(id *owner.ID) ([]*cid.ID, error) {
|
func (s *ttlContainerLister) List(id *owner.ID) ([]*cid.ID, error) {
|
||||||
var str string
|
var str string
|
||||||
|
|
||||||
|
@ -275,3 +306,33 @@ func (s *ttlContainerLister) List(id *owner.ID) ([]*cid.ID, error) {
|
||||||
|
|
||||||
return val.([]*cid.ID), nil
|
return val.([]*cid.ID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InvalidateContainerList removes cached list of container IDs.
|
||||||
|
func (s *ttlContainerLister) InvalidateContainerList(id *owner.ID) {
|
||||||
|
(*ttlNetCache)(s).remove(id.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidateContainerListByCID removes cached list of container IDs. To do that
|
||||||
|
// function iterates over all available lists and removes the first list where
|
||||||
|
// specified ID is present.
|
||||||
|
func (s *ttlContainerLister) InvalidateContainerListByCID(id *cid.ID) {
|
||||||
|
cache := (*ttlNetCache)(s)
|
||||||
|
for _, key := range cache.keys() {
|
||||||
|
val, err := cache.get(key)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ids, ok := val.([]*cid.ID)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range ids {
|
||||||
|
if ids[i].Equal(id) {
|
||||||
|
cache.remove(key)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ func initContainerService(c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrRdr := new(morphContainerReader)
|
cnrRdr := new(morphContainerReader)
|
||||||
|
cnrInvalidator := new(morphContainerInvalidator)
|
||||||
|
|
||||||
if c.cfgMorph.disableCache {
|
if c.cfgMorph.disableCache {
|
||||||
c.cfgObject.eaclSource = eACLFetcher
|
c.cfgObject.eaclSource = eACLFetcher
|
||||||
|
@ -67,11 +68,20 @@ func initContainerService(c *cfg) {
|
||||||
cnrRdr.lister = wrap
|
cnrRdr.lister = wrap
|
||||||
} else {
|
} else {
|
||||||
// use RPC node as source of Container contract items (with caching)
|
// use RPC node as source of Container contract items (with caching)
|
||||||
c.cfgObject.eaclSource = newCachedEACLStorage(eACLFetcher)
|
cachedContainerStorage := newCachedContainerStorage(cnrSrc)
|
||||||
c.cfgObject.cnrSource = newCachedContainerStorage(cnrSrc)
|
cachedEACLStorage := newCachedEACLStorage(eACLFetcher)
|
||||||
cnrRdr.lister = newCachedContainerLister(wrap)
|
cachedContainerLister := newCachedContainerLister(wrap)
|
||||||
|
|
||||||
|
c.cfgObject.eaclSource = cachedEACLStorage
|
||||||
|
c.cfgObject.cnrSource = cachedContainerStorage
|
||||||
|
|
||||||
|
cnrRdr.lister = cachedContainerLister
|
||||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
cnrRdr.eacl = c.cfgObject.eaclSource
|
||||||
cnrRdr.get = c.cfgObject.cnrSource
|
cnrRdr.get = c.cfgObject.cnrSource
|
||||||
|
|
||||||
|
cnrInvalidator.lists = cachedContainerLister
|
||||||
|
cnrInvalidator.eacls = cachedEACLStorage
|
||||||
|
cnrInvalidator.containers = cachedContainerStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
localMetrics := &localStorageLoad{
|
localMetrics := &localStorageLoad{
|
||||||
|
@ -142,7 +152,7 @@ func initContainerService(c *cfg) {
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
containerService.NewResponseService(
|
containerService.NewResponseService(
|
||||||
&usedSpaceService{
|
&usedSpaceService{
|
||||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(wrap, cnrRdr)),
|
Server: containerService.NewExecutionService(containerMorph.NewExecutor(wrap, cnrRdr, cnrInvalidator)),
|
||||||
loadWriterProvider: loadRouter,
|
loadWriterProvider: loadRouter,
|
||||||
loadPlacementBuilder: loadPlacementBuilder,
|
loadPlacementBuilder: loadPlacementBuilder,
|
||||||
routeBuilder: routeBuilder,
|
routeBuilder: routeBuilder,
|
||||||
|
@ -528,3 +538,42 @@ func (x *morphContainerReader) GetEACL(id *cid.ID) (*eaclSDK.Table, error) {
|
||||||
func (x *morphContainerReader) List(id *owner.ID) ([]*cid.ID, error) {
|
func (x *morphContainerReader) List(id *owner.ID) ([]*cid.ID, error) {
|
||||||
return x.lister.List(id)
|
return x.lister.List(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type morphContainerInvalidator struct {
|
||||||
|
containers interface {
|
||||||
|
InvalidateContainer(*cid.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
eacls interface {
|
||||||
|
InvalidateEACL(*cid.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
lists interface {
|
||||||
|
InvalidateContainerList(*owner.ID)
|
||||||
|
InvalidateContainerListByCID(*cid.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerInvalidator) InvalidateContainer(id *cid.ID) {
|
||||||
|
if x.containers != nil {
|
||||||
|
x.containers.InvalidateContainer(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerInvalidator) InvalidateEACL(id *cid.ID) {
|
||||||
|
if x.eacls != nil {
|
||||||
|
x.eacls.InvalidateEACL(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerInvalidator) InvalidateContainerList(id *owner.ID) {
|
||||||
|
if x.lists != nil {
|
||||||
|
x.lists.InvalidateContainerList(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerInvalidator) InvalidateContainerListByCID(id *cid.ID) {
|
||||||
|
if x.lists != nil {
|
||||||
|
x.lists.InvalidateContainerListByCID(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue