[#1445] services/tree: Cache the list of container nodes

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-30 20:46:29 +03:00 committed by fyrchik
parent a2edfec0c3
commit 39f47f61c6
4 changed files with 142 additions and 55 deletions

View file

@ -22,6 +22,7 @@ type Service struct {
replicateCh chan movePair
replicationTasks chan replicationTask
closeCh chan struct{}
containerCache containerCache
}
// MaxGetSubTreeDepth represents maximum allowed traversal depth in GetSubTree RPC.
@ -44,6 +45,7 @@ func New(opts ...Option) *Service {
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
s.replicationTasks = make(chan replicationTask, defaultReplicatorWorkerCount)
s.containerCache.init()
return &s
}
@ -71,8 +73,11 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
return nil, err
}
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *AddResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
@ -85,7 +90,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
return resp, outErr
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
Parent: b.GetParentId(),
Child: pilorama.RootID,
@ -116,8 +121,11 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
return nil, err
}
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *AddByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
@ -137,7 +145,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
attr = pilorama.AttributeFilename
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
logs, err := s.forest.TreeAddByPath(d, b.GetTreeId(), attr, b.GetPath(), meta)
if err != nil {
return nil, err
@ -173,8 +181,11 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err
}
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *RemoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
@ -191,7 +202,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
Parent: pilorama.TrashID,
Child: b.GetNodeId(),
@ -219,8 +230,11 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
return nil, err
}
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *MoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
@ -237,7 +251,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
Parent: b.GetParentId(),
Child: b.GetNodeId(),
@ -264,8 +278,11 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
return nil, err
}
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *GetNodeByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
@ -344,8 +361,11 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
return err
}
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return err
}
if pos < 0 {
var cli TreeService_GetSubTreeClient
var outErr error
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
@ -416,7 +436,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
key := req.GetSignature().GetKey()
_, pos, size, err := s.getContainerInfo(cid, key)
if err == errNotInContainer {
if err != nil {
return nil, err
}
if pos < 0 {
return nil, errors.New("`Apply` request must be signed by a container node")
}
@ -444,8 +467,11 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
return err
}
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
if err == errNotInContainer {
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return err
}
if pos < 0 {
var cli TreeService_GetOpLogClient
var outErr error
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
@ -511,12 +537,10 @@ func metaToProto(arr []pilorama.KeyValue) []*KeyValue {
return meta
}
var errNotInContainer = errors.New("node doesn't belong to a container")
// getContainerInfo returns the list of container nodes, position in the container for the node
// with pub key and total amount of nodes in all replicas.
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
cntNodes, err := s.getContainerNodes(cid)
cntNodes, _, err := s.getContainerNodes(cid)
if err != nil {
return nil, 0, 0, err
}
@ -526,5 +550,5 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
return cntNodes, i, len(cntNodes), nil
}
}
return nil, 0, 0, errNotInContainer
return cntNodes, -1, len(cntNodes), nil
}