forked from TrueCloudLab/frostfs-node
[#1445] services/tree: Cache the list of container nodes
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
879c1de59d
commit
96277c650f
4 changed files with 147 additions and 55 deletions
95
pkg/services/tree/container.go
Normal file
95
pkg/services/tree/container.go
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/golang-lru/simplelru"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
|
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type containerCache struct {
|
||||||
|
sync.Mutex
|
||||||
|
nm *netmapSDK.NetMap
|
||||||
|
lru *simplelru.LRU
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *containerCache) init() {
|
||||||
|
c.lru, _ = simplelru.NewLRU(defaultContainerCacheSize, nil) // no error, size is positive
|
||||||
|
}
|
||||||
|
|
||||||
|
type containerCacheItem struct {
|
||||||
|
cnr *container.Container
|
||||||
|
local int
|
||||||
|
nodes []netmapSDK.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
const defaultContainerCacheSize = 10
|
||||||
|
|
||||||
|
type index struct {
|
||||||
|
replica uint32
|
||||||
|
index uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
// getContainerNodes returns nodes in the container and a position of local key in the list.
|
||||||
|
func (s *Service) getContainerNodes(cid cidSDK.ID) ([]netmapSDK.NodeInfo, int, error) {
|
||||||
|
nm, err := s.nmSource.GetNetMap(0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, -1, fmt.Errorf("can't get netmap: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr, err := s.cnrSource.Get(cid)
|
||||||
|
if err != nil {
|
||||||
|
return nil, -1, fmt.Errorf("can't get container: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cidStr := cid.String()
|
||||||
|
|
||||||
|
s.containerCache.Lock()
|
||||||
|
if s.containerCache.nm != nm {
|
||||||
|
s.containerCache.lru.Purge()
|
||||||
|
} else if v, ok := s.containerCache.lru.Get(cidStr); ok {
|
||||||
|
item := v.(containerCacheItem)
|
||||||
|
if item.cnr == cnr {
|
||||||
|
s.containerCache.Unlock()
|
||||||
|
return item.nodes, item.local, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.containerCache.Unlock()
|
||||||
|
|
||||||
|
policy := cnr.Value.PlacementPolicy()
|
||||||
|
|
||||||
|
rawCID := make([]byte, sha256.Size)
|
||||||
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
|
cntNodes, err := nm.ContainerNodes(policy, rawCID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := placement.FlattenNodes(cntNodes)
|
||||||
|
|
||||||
|
localPos := -1
|
||||||
|
for i := range nodes {
|
||||||
|
if bytes.Equal(nodes[i].PublicKey(), s.rawPub) {
|
||||||
|
localPos = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.containerCache.Lock()
|
||||||
|
s.containerCache.nm = nm
|
||||||
|
s.containerCache.lru.Add(cidStr, containerCacheItem{
|
||||||
|
cnr: cnr,
|
||||||
|
local: localPos,
|
||||||
|
nodes: nodes,
|
||||||
|
})
|
||||||
|
s.containerCache.Unlock()
|
||||||
|
|
||||||
|
return nodes, localPos, err
|
||||||
|
}
|
|
@ -1,7 +1,6 @@
|
||||||
package tree
|
package tree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
@ -9,7 +8,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
||||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -102,17 +100,15 @@ func (s *Service) replicate(op movePair) error {
|
||||||
return fmt.Errorf("can't sign data: %w", err)
|
return fmt.Errorf("can't sign data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes, err := s.getContainerNodes(op.cid)
|
nodes, localIndex, err := s.getContainerNodes(op.cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't get container nodes: %w", err)
|
return fmt.Errorf("can't get container nodes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, n := range nodes {
|
for i := range nodes {
|
||||||
if bytes.Equal(n.PublicKey(), s.rawPub) {
|
if i != localIndex {
|
||||||
continue
|
s.replicationTasks <- replicationTask{nodes[i], req}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.replicationTasks <- replicationTask{n, req}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -128,29 +124,6 @@ func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getContainerNodes(cid cidSDK.ID) ([]netmapSDK.NodeInfo, error) {
|
|
||||||
nm, err := s.nmSource.GetNetMap(0)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't get netmap: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cnr, err := s.cnrSource.Get(cid)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't get container: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
policy := cnr.Value.PlacementPolicy()
|
|
||||||
rawCID := make([]byte, sha256.Size)
|
|
||||||
cid.Encode(rawCID)
|
|
||||||
|
|
||||||
nodes, err := nm.ContainerNodes(policy, rawCID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return placement.FlattenNodes(nodes), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newApplyRequest(op *movePair) *ApplyRequest {
|
func newApplyRequest(op *movePair) *ApplyRequest {
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
op.cid.Encode(rawCID)
|
op.cid.Encode(rawCID)
|
||||||
|
|
|
@ -22,6 +22,7 @@ type Service struct {
|
||||||
replicateCh chan movePair
|
replicateCh chan movePair
|
||||||
replicationTasks chan replicationTask
|
replicationTasks chan replicationTask
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
|
containerCache containerCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxGetSubTreeDepth represents maximum allowed traversal depth in GetSubTree RPC.
|
// MaxGetSubTreeDepth represents maximum allowed traversal depth in GetSubTree RPC.
|
||||||
|
@ -44,6 +45,7 @@ func New(opts ...Option) *Service {
|
||||||
s.closeCh = make(chan struct{})
|
s.closeCh = make(chan struct{})
|
||||||
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
|
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
|
||||||
s.replicationTasks = make(chan replicationTask, defaultReplicatorWorkerCount)
|
s.replicationTasks = make(chan replicationTask, defaultReplicatorWorkerCount)
|
||||||
|
s.containerCache.init()
|
||||||
|
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
|
@ -71,8 +73,11 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var resp *AddResponse
|
var resp *AddResponse
|
||||||
var outErr error
|
var outErr error
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -85,7 +90,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
return resp, outErr
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||||
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
||||||
Parent: b.GetParentId(),
|
Parent: b.GetParentId(),
|
||||||
Child: pilorama.RootID,
|
Child: pilorama.RootID,
|
||||||
|
@ -116,8 +121,11 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var resp *AddByPathResponse
|
var resp *AddByPathResponse
|
||||||
var outErr error
|
var outErr error
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -137,7 +145,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
attr = pilorama.AttributeFilename
|
attr = pilorama.AttributeFilename
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||||
logs, err := s.forest.TreeAddByPath(d, b.GetTreeId(), attr, b.GetPath(), meta)
|
logs, err := s.forest.TreeAddByPath(d, b.GetTreeId(), attr, b.GetPath(), meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -173,8 +181,11 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var resp *RemoveResponse
|
var resp *RemoveResponse
|
||||||
var outErr error
|
var outErr error
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -191,7 +202,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
|
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||||
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
||||||
Parent: pilorama.TrashID,
|
Parent: pilorama.TrashID,
|
||||||
Child: b.GetNodeId(),
|
Child: b.GetNodeId(),
|
||||||
|
@ -219,8 +230,11 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var resp *MoveResponse
|
var resp *MoveResponse
|
||||||
var outErr error
|
var outErr error
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -237,7 +251,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
|
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||||
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
||||||
Parent: b.GetParentId(),
|
Parent: b.GetParentId(),
|
||||||
Child: b.GetNodeId(),
|
Child: b.GetNodeId(),
|
||||||
|
@ -264,8 +278,11 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var resp *GetNodeByPathResponse
|
var resp *GetNodeByPathResponse
|
||||||
var outErr error
|
var outErr error
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -344,8 +361,11 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var cli TreeService_GetSubTreeClient
|
var cli TreeService_GetSubTreeClient
|
||||||
var outErr error
|
var outErr error
|
||||||
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -416,7 +436,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
key := req.GetSignature().GetKey()
|
key := req.GetSignature().GetKey()
|
||||||
|
|
||||||
_, pos, size, err := s.getContainerInfo(cid, key)
|
_, pos, size, err := s.getContainerInfo(cid, key)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
return nil, errors.New("`Apply` request must be signed by a container node")
|
return nil, errors.New("`Apply` request must be signed by a container node")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,8 +467,11 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
|
ns, pos, err := s.getContainerNodes(cid)
|
||||||
if err == errNotInContainer {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if pos < 0 {
|
||||||
var cli TreeService_GetOpLogClient
|
var cli TreeService_GetOpLogClient
|
||||||
var outErr error
|
var outErr error
|
||||||
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
||||||
|
@ -511,12 +537,10 @@ func metaToProto(arr []pilorama.KeyValue) []*KeyValue {
|
||||||
return meta
|
return meta
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNotInContainer = errors.New("node doesn't belong to a container")
|
|
||||||
|
|
||||||
// getContainerInfo returns the list of container nodes, position in the container for the node
|
// getContainerInfo returns the list of container nodes, position in the container for the node
|
||||||
// with pub key and total amount of nodes in all replicas.
|
// with pub key and total amount of nodes in all replicas.
|
||||||
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
|
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
|
||||||
cntNodes, err := s.getContainerNodes(cid)
|
cntNodes, _, err := s.getContainerNodes(cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, 0, err
|
return nil, 0, 0, err
|
||||||
}
|
}
|
||||||
|
@ -526,5 +550,5 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
|
||||||
return cntNodes, i, len(cntNodes), nil
|
return cntNodes, i, len(cntNodes), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, 0, 0, errNotInContainer
|
return cntNodes, -1, len(cntNodes), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
|
|
||||||
// Synchronize tries to synchronize log starting from the last stored height.
|
// Synchronize tries to synchronize log starting from the last stored height.
|
||||||
func (s *Service) Synchronize(ctx context.Context, cid cid.ID, treeID string) error {
|
func (s *Service) Synchronize(ctx context.Context, cid cid.ID, treeID string) error {
|
||||||
nodes, err := s.getContainerNodes(cid)
|
nodes, _, err := s.getContainerNodes(cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't get container nodes: %w", err)
|
return fmt.Errorf("can't get container nodes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue