forked from TrueCloudLab/frostfs-node
e738699fcc
In previous implementation Container service handlers didn't cache the results of `Get` / `GetEACL` / `List` operations. As a consequence of this, high load on the service caused neo-go client's connection errors. To avoid this there is a need to use cache. Object service already uses `Get` and `GetEACL` caches. Implement cache of `List` results. Share already implemented cache of Object service with the Container one. Provide new instance of read-only container storage (defined as an interface)to morph executor's constructor on which container service is based. Write operations remained unchanged. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
277 lines
5.7 KiB
Go
277 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
lru "github.com/hashicorp/golang-lru"
|
|
eaclSDK "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
|
|
containerSDK "github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
|
netmapSDK "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
|
|
)
|
|
|
|
type netValueReader func(interface{}) (interface{}, error)
|
|
|
|
type valueWithTime struct {
|
|
v interface{}
|
|
t time.Time
|
|
}
|
|
|
|
// entity that provides TTL cache interface.
|
|
type ttlNetCache struct {
|
|
mtx sync.Mutex
|
|
|
|
ttl time.Duration
|
|
|
|
sz int
|
|
|
|
cache *lru.Cache
|
|
|
|
netRdr netValueReader
|
|
}
|
|
|
|
// complicates netValueReader with TTL caching mechanism.
|
|
func newNetworkTTLCache(sz int, ttl time.Duration, netRdr netValueReader) *ttlNetCache {
|
|
cache, err := lru.New(sz)
|
|
fatalOnErr(err)
|
|
|
|
return &ttlNetCache{
|
|
ttl: ttl,
|
|
sz: sz,
|
|
cache: cache,
|
|
netRdr: netRdr,
|
|
}
|
|
}
|
|
|
|
// reads value by the key.
|
|
//
|
|
// updates the value from the network on cache miss or by TTL.
|
|
//
|
|
// returned value should not be modified.
|
|
func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
val, ok := c.cache.Peek(key)
|
|
if ok {
|
|
valWithTime := val.(*valueWithTime)
|
|
|
|
if time.Since(valWithTime.t) < c.ttl {
|
|
return valWithTime.v, nil
|
|
}
|
|
|
|
c.cache.Remove(key)
|
|
}
|
|
|
|
val, err := c.netRdr(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.cache.Add(key, &valueWithTime{
|
|
v: val,
|
|
t: time.Now(),
|
|
})
|
|
|
|
return val, nil
|
|
}
|
|
|
|
// entity that provides LRU cache interface.
|
|
type lruNetCache struct {
|
|
mtx sync.Mutex
|
|
|
|
cache *lru.Cache
|
|
|
|
netRdr netValueReader
|
|
}
|
|
|
|
// complicates netValueReader with LRU caching mechanism.
|
|
func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
|
|
cache, err := lru.New(sz)
|
|
fatalOnErr(err)
|
|
|
|
return &lruNetCache{
|
|
cache: cache,
|
|
netRdr: netRdr,
|
|
}
|
|
}
|
|
|
|
// reads value by the key.
|
|
//
|
|
// updates the value from the network on cache miss.
|
|
//
|
|
// returned value should not be modified.
|
|
func (c *lruNetCache) get(key interface{}) (interface{}, error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
val, ok := c.cache.Get(key)
|
|
if ok {
|
|
return val, nil
|
|
}
|
|
|
|
val, err := c.netRdr(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.cache.Add(key, val)
|
|
|
|
return val, nil
|
|
}
|
|
|
|
// wrapper over TTL cache of values read from the network
|
|
// that implements container storage.
|
|
type ttlContainerStorage ttlNetCache
|
|
|
|
func newCachedContainerStorage(v container.Source) container.Source {
|
|
const (
|
|
containerCacheSize = 100
|
|
containerCacheTTL = 30 * time.Second
|
|
)
|
|
|
|
lruCnrCache := newNetworkTTLCache(containerCacheSize, containerCacheTTL, func(key interface{}) (interface{}, error) {
|
|
id := cid.New()
|
|
|
|
err := id.Parse(key.(string))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return v.Get(id)
|
|
})
|
|
|
|
return (*ttlContainerStorage)(lruCnrCache)
|
|
}
|
|
|
|
func (s *ttlContainerStorage) Get(cid *cid.ID) (*containerSDK.Container, error) {
|
|
val, err := (*ttlNetCache)(s).get(cid.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return val.(*containerSDK.Container), nil
|
|
}
|
|
|
|
type ttlEACLStorage ttlNetCache
|
|
|
|
func newCachedEACLStorage(v eacl.Source) eacl.Source {
|
|
const (
|
|
eaclCacheSize = 100
|
|
eaclCacheTTL = 30 * time.Second
|
|
)
|
|
|
|
lruCnrCache := newNetworkTTLCache(eaclCacheSize, eaclCacheTTL, func(key interface{}) (interface{}, error) {
|
|
id := cid.New()
|
|
|
|
err := id.Parse(key.(string))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return v.GetEACL(id)
|
|
})
|
|
|
|
return (*ttlEACLStorage)(lruCnrCache)
|
|
}
|
|
|
|
func (s *ttlEACLStorage) GetEACL(cid *cid.ID) (*eaclSDK.Table, error) {
|
|
val, err := (*ttlNetCache)(s).get(cid.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return val.(*eaclSDK.Table), nil
|
|
}
|
|
|
|
type lruNetmapSource struct {
|
|
netState netmap.State
|
|
|
|
cache *lruNetCache
|
|
}
|
|
|
|
func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
|
|
const netmapCacheSize = 10
|
|
|
|
lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(key interface{}) (interface{}, error) {
|
|
return v.GetNetMapByEpoch(key.(uint64))
|
|
})
|
|
|
|
return &lruNetmapSource{
|
|
netState: s,
|
|
cache: lruNetmapCache,
|
|
}
|
|
}
|
|
|
|
func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.Netmap, error) {
|
|
return s.getNetMapByEpoch(s.netState.CurrentEpoch() - diff)
|
|
}
|
|
|
|
func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.Netmap, error) {
|
|
return s.getNetMapByEpoch(epoch)
|
|
}
|
|
|
|
func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.Netmap, error) {
|
|
val, err := s.cache.get(epoch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return val.(*netmapSDK.Netmap), nil
|
|
}
|
|
|
|
func (s *lruNetmapSource) Epoch() (uint64, error) {
|
|
return s.netState.CurrentEpoch(), nil
|
|
}
|
|
|
|
// wrapper over TTL cache of values read from the network
|
|
// that implements container lister.
|
|
type ttlContainerLister ttlNetCache
|
|
|
|
func newCachedContainerLister(w *wrapper.Wrapper) *ttlContainerLister {
|
|
const (
|
|
containerListerCacheSize = 100
|
|
containerListerCacheTTL = 30 * time.Second
|
|
)
|
|
|
|
lruCnrListerCache := newNetworkTTLCache(containerListerCacheSize, containerListerCacheTTL, func(key interface{}) (interface{}, error) {
|
|
var (
|
|
id *owner.ID
|
|
strID = key.(string)
|
|
)
|
|
|
|
if strID != "" {
|
|
id = owner.NewID()
|
|
|
|
err := id.Parse(strID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return w.List(id)
|
|
})
|
|
|
|
return (*ttlContainerLister)(lruCnrListerCache)
|
|
}
|
|
|
|
func (s *ttlContainerLister) List(id *owner.ID) ([]*cid.ID, error) {
|
|
var str string
|
|
|
|
if id != nil {
|
|
str = id.String()
|
|
}
|
|
|
|
val, err := (*ttlNetCache)(s).get(str)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return val.([]*cid.ID), nil
|
|
}
|