All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m12s
Pre-commit hooks / Pre-commit (push) Successful in 1m41s
Build / Build Components (push) Successful in 1m54s
Tests and linters / Run gofumpt (push) Successful in 3m35s
Tests and linters / Lint (push) Successful in 4m5s
Tests and linters / Staticcheck (push) Successful in 4m9s
Tests and linters / gopls check (push) Successful in 4m29s
Tests and linters / Tests (push) Successful in 4m52s
OCI image / Build container images (push) Successful in 5m9s
Tests and linters / Tests with -race (push) Successful in 6m14s
Applicable for both cases: when node uses local cache for netmap and when it disabled. Change-Id: I3050f537e20312a4b39e944aca763b77bd1e74c4 Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
503 lines
13 KiB
Go
503 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"cmp"
|
|
"context"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type netValueReader[K any, V any] func(ctx context.Context, cid K) (V, error)
|
|
|
|
type valueWithError[V any] struct {
|
|
v V
|
|
// cached error in order to not repeat failed request for some time
|
|
e error
|
|
}
|
|
|
|
// entity that provides TTL cache interface.
|
|
type ttlNetCache[K comparable, V any] struct {
|
|
cache *expirable.LRU[K, *valueWithError[V]]
|
|
netRdr netValueReader[K, V]
|
|
keyLocker *utilSync.KeyLocker[K]
|
|
metrics cacheMetrics
|
|
}
|
|
|
|
// complicates netValueReader with TTL caching mechanism.
|
|
func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr netValueReader[K, V], metrics cacheMetrics) *ttlNetCache[K, V] {
|
|
cache := expirable.NewLRU[K, *valueWithError[V]](sz, nil, ttl)
|
|
|
|
return &ttlNetCache[K, V]{
|
|
cache: cache,
|
|
netRdr: netRdr,
|
|
metrics: metrics,
|
|
keyLocker: utilSync.NewKeyLocker[K](),
|
|
}
|
|
}
|
|
|
|
// reads value by the key.
|
|
//
|
|
// updates the value from the network on cache miss or by TTL.
|
|
//
|
|
// returned value should not be modified.
|
|
func (c *ttlNetCache[K, V]) get(ctx context.Context, key K) (V, error) {
|
|
hit := false
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
|
|
}()
|
|
|
|
val, ok := c.cache.Peek(key)
|
|
if ok {
|
|
hit = true
|
|
return val.v, val.e
|
|
}
|
|
|
|
c.keyLocker.Lock(key)
|
|
defer c.keyLocker.Unlock(key)
|
|
|
|
val, ok = c.cache.Peek(key)
|
|
if ok {
|
|
hit = true
|
|
return val.v, val.e
|
|
}
|
|
|
|
v, err := c.netRdr(ctx, key)
|
|
|
|
c.cache.Add(key, &valueWithError[V]{
|
|
v: v,
|
|
e: err,
|
|
})
|
|
|
|
return v, err
|
|
}
|
|
|
|
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
c.metrics.AddMethodDuration("Set", time.Since(startedAt), false)
|
|
}()
|
|
|
|
c.keyLocker.Lock(k)
|
|
defer c.keyLocker.Unlock(k)
|
|
|
|
c.cache.Add(k, &valueWithError[V]{
|
|
v: v,
|
|
e: e,
|
|
})
|
|
}
|
|
|
|
func (c *ttlNetCache[K, V]) remove(key K) {
|
|
hit := false
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
c.metrics.AddMethodDuration("Remove", time.Since(startedAt), hit)
|
|
}()
|
|
|
|
c.keyLocker.Lock(key)
|
|
defer c.keyLocker.Unlock(key)
|
|
|
|
hit = c.cache.Remove(key)
|
|
}
|
|
|
|
// wrapper over TTL cache of values read from the network
|
|
// that implements container storage.
|
|
type ttlContainerStorage struct {
|
|
containerCache *ttlNetCache[cid.ID, *container.Container]
|
|
delInfoCache *ttlNetCache[cid.ID, *container.DelInfo]
|
|
}
|
|
|
|
func newCachedContainerStorage(v container.Source, ttl time.Duration, containerCacheSize uint32) ttlContainerStorage {
|
|
lruCnrCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(ctx context.Context, id cid.ID) (*container.Container, error) {
|
|
return v.Get(ctx, id)
|
|
}, metrics.NewCacheMetrics("container"))
|
|
lruDelInfoCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(ctx context.Context, id cid.ID) (*container.DelInfo, error) {
|
|
return v.DeletionInfo(ctx, id)
|
|
}, metrics.NewCacheMetrics("container_deletion_info"))
|
|
|
|
return ttlContainerStorage{
|
|
containerCache: lruCnrCache,
|
|
delInfoCache: lruDelInfoCache,
|
|
}
|
|
}
|
|
|
|
func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
|
s.containerCache.set(cnr, nil, new(apistatus.ContainerNotFound))
|
|
|
|
// The removal invalidates possibly stored error response.
|
|
s.delInfoCache.remove(cnr)
|
|
}
|
|
|
|
// Get returns container value from the cache. If value is missing in the cache
|
|
// or expired, then it returns value from side chain and updates the cache.
|
|
func (s ttlContainerStorage) Get(ctx context.Context, cnr cid.ID) (*container.Container, error) {
|
|
return s.containerCache.get(ctx, cnr)
|
|
}
|
|
|
|
func (s ttlContainerStorage) DeletionInfo(ctx context.Context, cnr cid.ID) (*container.DelInfo, error) {
|
|
return s.delInfoCache.get(ctx, cnr)
|
|
}
|
|
|
|
type lruNetmapSource struct {
|
|
netState netmap.State
|
|
|
|
client rawSource
|
|
cache *simplelru.LRU[uint64, *atomic.Pointer[netmapSDK.NetMap]]
|
|
mtx sync.RWMutex
|
|
metrics cacheMetrics
|
|
log *logger.Logger
|
|
candidates atomic.Pointer[[]netmapSDK.NodeInfo]
|
|
}
|
|
|
|
type rawSource interface {
|
|
GetCandidates(ctx context.Context) ([]netmapSDK.NodeInfo, error)
|
|
GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error)
|
|
}
|
|
|
|
func newCachedNetmapStorage(ctx context.Context, log *logger.Logger,
|
|
netState netmap.State, client rawSource, wg *sync.WaitGroup, d time.Duration,
|
|
) netmap.Source {
|
|
const netmapCacheSize = 10
|
|
|
|
cache, err := simplelru.NewLRU[uint64, *atomic.Pointer[netmapSDK.NetMap]](netmapCacheSize, nil)
|
|
fatalOnErr(err)
|
|
|
|
src := &lruNetmapSource{
|
|
netState: netState,
|
|
client: client,
|
|
cache: cache,
|
|
log: log,
|
|
metrics: metrics.NewCacheMetrics("netmap"),
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
src.updateCandidates(ctx, d)
|
|
}()
|
|
|
|
return src
|
|
}
|
|
|
|
// updateCandidates routine to merge netmap in cache with candidates list.
|
|
func (s *lruNetmapSource) updateCandidates(ctx context.Context, d time.Duration) {
|
|
timer := time.NewTimer(d)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
newCandidates, err := s.client.GetCandidates(ctx)
|
|
if err != nil {
|
|
s.log.Debug(ctx, logs.FailedToUpdateNetmapCandidates, zap.Error(err))
|
|
timer.Reset(d)
|
|
break
|
|
}
|
|
if len(newCandidates) == 0 {
|
|
s.candidates.Store(&newCandidates)
|
|
timer.Reset(d)
|
|
break
|
|
}
|
|
slices.SortFunc(newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int {
|
|
return cmp.Compare(n1.Hash(), n2.Hash())
|
|
})
|
|
|
|
// Check once state changed
|
|
v := s.candidates.Load()
|
|
if v == nil {
|
|
s.candidates.Store(&newCandidates)
|
|
s.mergeCacheWithCandidates(newCandidates)
|
|
timer.Reset(d)
|
|
break
|
|
}
|
|
ret := slices.CompareFunc(*v, newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int {
|
|
if !bytes.Equal(n1.PublicKey(), n2.PublicKey()) ||
|
|
uint32(n1.Status()) != uint32(n2.Status()) ||
|
|
slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 {
|
|
return 1
|
|
}
|
|
var ne1 []string
|
|
n1.IterateNetworkEndpoints(func(s string) bool {
|
|
ne1 = append(ne1, s)
|
|
return false
|
|
})
|
|
var ne2 []string
|
|
n2.IterateNetworkEndpoints(func(s string) bool {
|
|
ne2 = append(ne2, s)
|
|
return false
|
|
})
|
|
return slices.Compare(ne1, ne2)
|
|
})
|
|
if ret != 0 {
|
|
s.candidates.Store(&newCandidates)
|
|
s.mergeCacheWithCandidates(newCandidates)
|
|
}
|
|
timer.Reset(d)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *lruNetmapSource) mergeCacheWithCandidates(candidates []netmapSDK.NodeInfo) {
|
|
s.mtx.Lock()
|
|
tmp := s.cache.Values()
|
|
s.mtx.Unlock()
|
|
for _, pointer := range tmp {
|
|
nm := pointer.Load()
|
|
updates := getNetMapNodesToUpdate(nm, candidates)
|
|
if len(updates) > 0 {
|
|
nm = nm.Clone()
|
|
mergeNetmapWithCandidates(updates, nm)
|
|
pointer.Store(nm)
|
|
}
|
|
}
|
|
}
|
|
|
|
// reads value by the key.
|
|
//
|
|
// updates the value from the network on cache miss.
|
|
//
|
|
// returned value should not be modified.
|
|
func (s *lruNetmapSource) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
|
|
hit := false
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
s.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
|
|
}()
|
|
|
|
s.mtx.RLock()
|
|
val, ok := s.cache.Get(key)
|
|
s.mtx.RUnlock()
|
|
if ok {
|
|
hit = true
|
|
return val.Load(), nil
|
|
}
|
|
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
val, ok = s.cache.Get(key)
|
|
if ok {
|
|
hit = true
|
|
return val.Load(), nil
|
|
}
|
|
|
|
nm, err := s.client.GetNetMapByEpoch(ctx, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
v := s.candidates.Load()
|
|
if v != nil {
|
|
updates := getNetMapNodesToUpdate(nm, *v)
|
|
if len(updates) > 0 {
|
|
mergeNetmapWithCandidates(updates, nm)
|
|
}
|
|
}
|
|
|
|
p := atomic.Pointer[netmapSDK.NetMap]{}
|
|
p.Store(nm)
|
|
s.cache.Add(key, &p)
|
|
|
|
return nm, nil
|
|
}
|
|
|
|
// mergeNetmapWithCandidates updates nodes state in the provided netmap with state in the list of candidates.
|
|
func mergeNetmapWithCandidates(updates []nodeToUpdate, nm *netmapSDK.NetMap) {
|
|
for _, v := range updates {
|
|
if v.status != netmapSDK.UnspecifiedState {
|
|
nm.Nodes()[v.netmapIndex].SetStatus(v.status)
|
|
}
|
|
if v.externalAddresses != nil {
|
|
nm.Nodes()[v.netmapIndex].SetExternalAddresses(v.externalAddresses...)
|
|
}
|
|
if v.endpoints != nil {
|
|
nm.Nodes()[v.netmapIndex].SetNetworkEndpoints(v.endpoints...)
|
|
}
|
|
}
|
|
}
|
|
|
|
type nodeToUpdate struct {
|
|
netmapIndex int
|
|
status netmapSDK.NodeState
|
|
externalAddresses []string
|
|
endpoints []string
|
|
}
|
|
|
|
// getNetMapNodesToUpdate checks for the changes between provided netmap and the list of candidates.
|
|
func getNetMapNodesToUpdate(nm *netmapSDK.NetMap, candidates []netmapSDK.NodeInfo) []nodeToUpdate {
|
|
var res []nodeToUpdate
|
|
for i := range nm.Nodes() {
|
|
for _, cnd := range candidates {
|
|
if bytes.Equal(nm.Nodes()[i].PublicKey(), cnd.PublicKey()) {
|
|
var tmp nodeToUpdate
|
|
var update bool
|
|
|
|
if cnd.Status() != nm.Nodes()[i].Status() &&
|
|
(cnd.Status() == netmapSDK.Online || cnd.Status() == netmapSDK.Maintenance) {
|
|
update = true
|
|
tmp.status = cnd.Status()
|
|
}
|
|
|
|
externalAddresses := cnd.ExternalAddresses()
|
|
if externalAddresses != nil &&
|
|
slices.Compare(externalAddresses, nm.Nodes()[i].ExternalAddresses()) != 0 {
|
|
update = true
|
|
tmp.externalAddresses = externalAddresses
|
|
}
|
|
|
|
nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints())
|
|
nm.Nodes()[i].IterateNetworkEndpoints(func(s string) bool {
|
|
nodeEndpoints = append(nodeEndpoints, s)
|
|
return false
|
|
})
|
|
candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints())
|
|
cnd.IterateNetworkEndpoints(func(s string) bool {
|
|
candidateEndpoints = append(candidateEndpoints, s)
|
|
return false
|
|
})
|
|
if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 {
|
|
update = true
|
|
tmp.endpoints = candidateEndpoints
|
|
}
|
|
|
|
if update {
|
|
tmp.netmapIndex = i
|
|
res = append(res, tmp)
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (s *lruNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) {
|
|
return s.getNetMapByEpoch(ctx, s.netState.CurrentEpoch()-diff)
|
|
}
|
|
|
|
func (s *lruNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
|
|
return s.getNetMapByEpoch(ctx, epoch)
|
|
}
|
|
|
|
func (s *lruNetmapSource) getNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
|
|
val, err := s.get(ctx, epoch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return val, nil
|
|
}
|
|
|
|
func (s *lruNetmapSource) Epoch(_ context.Context) (uint64, error) {
|
|
return s.netState.CurrentEpoch(), nil
|
|
}
|
|
|
|
type cachedIRFetcher struct {
|
|
*ttlNetCache[struct{}, [][]byte]
|
|
}
|
|
|
|
func newCachedIRFetcher(f interface {
|
|
InnerRingKeys(ctx context.Context) ([][]byte, error)
|
|
},
|
|
) cachedIRFetcher {
|
|
const (
|
|
irFetcherCacheSize = 1 // we intend to store only one value
|
|
|
|
// Without the cache in the testnet we can see several hundred simultaneous
|
|
// requests (frostfs-node #1278), so limiting the request rate solves the issue.
|
|
//
|
|
// Exact request rate doesn't really matter because Inner Ring list update
|
|
// happens extremely rare, but there is no side chain events for that as
|
|
// for now (frostfs-contract v0.15.0 notary disabled env) to monitor it.
|
|
irFetcherCacheTTL = 30 * time.Second
|
|
)
|
|
|
|
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
|
|
func(ctx context.Context, _ struct{}) ([][]byte, error) {
|
|
return f.InnerRingKeys(ctx)
|
|
}, metrics.NewCacheMetrics("ir_keys"),
|
|
)
|
|
|
|
return cachedIRFetcher{irFetcherCache}
|
|
}
|
|
|
|
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
|
|
// the cache or expired, then it returns keys from side chain and updates
|
|
// the cache.
|
|
func (f cachedIRFetcher) InnerRingKeys(ctx context.Context) ([][]byte, error) {
|
|
val, err := f.get(ctx, struct{}{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return val, nil
|
|
}
|
|
|
|
type ttlMaxObjectSizeCache struct {
|
|
mtx sync.RWMutex
|
|
lastUpdated time.Time
|
|
lastSize uint64
|
|
src objectwriter.MaxSizeSource
|
|
metrics cacheMetrics
|
|
}
|
|
|
|
func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
|
|
return &ttlMaxObjectSizeCache{
|
|
src: src,
|
|
metrics: metrics.NewCacheMetrics("max_object_size"),
|
|
}
|
|
}
|
|
|
|
func (c *ttlMaxObjectSizeCache) MaxObjectSize(ctx context.Context) uint64 {
|
|
const ttl = time.Second * 30
|
|
|
|
hit := false
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
|
|
}()
|
|
|
|
c.mtx.RLock()
|
|
prevUpdated := c.lastUpdated
|
|
size := c.lastSize
|
|
c.mtx.RUnlock()
|
|
|
|
if time.Since(prevUpdated) < ttl {
|
|
hit = true
|
|
return size
|
|
}
|
|
|
|
c.mtx.Lock()
|
|
size = c.lastSize
|
|
if !c.lastUpdated.After(prevUpdated) {
|
|
size = c.src.MaxObjectSize(ctx)
|
|
c.lastSize = size
|
|
c.lastUpdated = time.Now()
|
|
}
|
|
c.mtx.Unlock()
|
|
|
|
return size
|
|
}
|
|
|
|
type cacheMetrics interface {
|
|
AddMethodDuration(method string, d time.Duration, hit bool)
|
|
}
|