diff --git a/pkg/services/tree/container.go b/pkg/services/tree/container.go new file mode 100644 index 00000000..1e950ffc --- /dev/null +++ b/pkg/services/tree/container.go @@ -0,0 +1,95 @@ +package tree + +import ( + "bytes" + "crypto/sha256" + "fmt" + "sync" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +type containerCache struct { + sync.Mutex + nm *netmapSDK.NetMap + lru *simplelru.LRU +} + +func (c *containerCache) init() { + c.lru, _ = simplelru.NewLRU(defaultContainerCacheSize, nil) // no error, size is positive +} + +type containerCacheItem struct { + cnr *container.Container + local int + nodes []netmapSDK.NodeInfo +} + +const defaultContainerCacheSize = 10 + +type index struct { + replica uint32 + index uint32 +} + +// getContainerNodes returns nodes in the container and a position of local key in the list. +func (s *Service) getContainerNodes(cid cidSDK.ID) ([]netmapSDK.NodeInfo, int, error) { + nm, err := s.nmSource.GetNetMap(0) + if err != nil { + return nil, -1, fmt.Errorf("can't get netmap: %w", err) + } + + cnr, err := s.cnrSource.Get(cid) + if err != nil { + return nil, -1, fmt.Errorf("can't get container: %w", err) + } + + cidStr := cid.String() + + s.containerCache.Lock() + if s.containerCache.nm != nm { + s.containerCache.lru.Purge() + } else if v, ok := s.containerCache.lru.Get(cidStr); ok { + item := v.(containerCacheItem) + if item.cnr == cnr { + s.containerCache.Unlock() + return item.nodes, item.local, nil + } + } + s.containerCache.Unlock() + + policy := cnr.Value.PlacementPolicy() + + rawCID := make([]byte, sha256.Size) + cid.Encode(rawCID) + + cntNodes, err := nm.ContainerNodes(policy, rawCID) + if err != nil { + return nil, -1, err + } + + nodes := placement.FlattenNodes(cntNodes) + + localPos := -1 + for i := range nodes { + if bytes.Equal(nodes[i].PublicKey(), s.rawPub) { + localPos = i + break + } + } + + s.containerCache.Lock() + s.containerCache.nm = nm + s.containerCache.lru.Add(cidStr, containerCacheItem{ + cnr: cnr, + local: localPos, + nodes: nodes, + }) + s.containerCache.Unlock() + + return nodes, localPos, err +} diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 09f577c1..afbdbeee 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -1,7 +1,6 @@ package tree import ( - "bytes" "context" "crypto/sha256" "encoding/hex" @@ -9,7 +8,6 @@ import ( "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" @@ -102,17 +100,15 @@ func (s *Service) replicate(op movePair) error { return fmt.Errorf("can't sign data: %w", err) } - nodes, err := s.getContainerNodes(op.cid) + nodes, localIndex, err := s.getContainerNodes(op.cid) if err != nil { return fmt.Errorf("can't get container nodes: %w", err) } - for _, n := range nodes { - if bytes.Equal(n.PublicKey(), s.rawPub) { - continue + for i := range nodes { + if i != localIndex { + s.replicationTasks <- replicationTask{nodes[i], req} } - - s.replicationTasks <- replicationTask{n, req} } return nil } @@ -128,29 +124,6 @@ func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove } } -func (s *Service) getContainerNodes(cid cidSDK.ID) ([]netmapSDK.NodeInfo, error) { - nm, err := s.nmSource.GetNetMap(0) - if err != nil { - return nil, fmt.Errorf("can't get netmap: %w", err) - } - - cnr, err := s.cnrSource.Get(cid) - if err != nil { - return nil, fmt.Errorf("can't get container: %w", err) - } - - policy := cnr.Value.PlacementPolicy() - rawCID := make([]byte, sha256.Size) - cid.Encode(rawCID) - - nodes, err := nm.ContainerNodes(policy, rawCID) - if err != nil { - return nil, err - } - - return placement.FlattenNodes(nodes), nil -} - func newApplyRequest(op *movePair) *ApplyRequest { rawCID := make([]byte, sha256.Size) op.cid.Encode(rawCID) diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index af71569b..b344c139 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -22,6 +22,7 @@ type Service struct { replicateCh chan movePair replicationTasks chan replicationTask closeCh chan struct{} + containerCache containerCache } // MaxGetSubTreeDepth represents maximum allowed traversal depth in GetSubTree RPC. @@ -44,6 +45,7 @@ func New(opts ...Option) *Service { s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, defaultReplicatorCapacity) s.replicationTasks = make(chan replicationTask, defaultReplicatorWorkerCount) + s.containerCache.init() return &s } @@ -71,8 +73,11 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error return nil, err } - ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return nil, err + } + if pos < 0 { var resp *AddResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { @@ -85,7 +90,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error return resp, outErr } - d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: pilorama.RootID, @@ -116,8 +121,11 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP return nil, err } - ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return nil, err + } + if pos < 0 { var resp *AddByPathResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { @@ -137,7 +145,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP attr = pilorama.AttributeFilename } - d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} logs, err := s.forest.TreeAddByPath(d, b.GetTreeId(), attr, b.GetPath(), meta) if err != nil { return nil, err @@ -173,8 +181,11 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon return nil, err } - ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return nil, err + } + if pos < 0 { var resp *RemoveResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { @@ -191,7 +202,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId()) } - d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{ Parent: pilorama.TrashID, Child: b.GetNodeId(), @@ -219,8 +230,11 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er return nil, err } - ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return nil, err + } + if pos < 0 { var resp *MoveResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { @@ -237,7 +251,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId()) } - d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: b.GetNodeId(), @@ -264,8 +278,11 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) return nil, err } - ns, _, _, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return nil, err + } + if pos < 0 { var resp *GetNodeByPathResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { @@ -344,8 +361,11 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS return err } - ns, _, _, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return err + } + if pos < 0 { var cli TreeService_GetSubTreeClient var outErr error err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool { @@ -416,7 +436,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e key := req.GetSignature().GetKey() _, pos, size, err := s.getContainerInfo(cid, key) - if err == errNotInContainer { + if err != nil { + return nil, err + } + if pos < 0 { return nil, errors.New("`Apply` request must be signed by a container node") } @@ -444,8 +467,11 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) return err } - ns, _, _, err := s.getContainerInfo(cid, s.rawPub) - if err == errNotInContainer { + ns, pos, err := s.getContainerNodes(cid) + if err != nil { + return err + } + if pos < 0 { var cli TreeService_GetOpLogClient var outErr error err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool { @@ -511,12 +537,10 @@ func metaToProto(arr []pilorama.KeyValue) []*KeyValue { return meta } -var errNotInContainer = errors.New("node doesn't belong to a container") - // getContainerInfo returns the list of container nodes, position in the container for the node // with pub key and total amount of nodes in all replicas. func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) { - cntNodes, err := s.getContainerNodes(cid) + cntNodes, _, err := s.getContainerNodes(cid) if err != nil { return nil, 0, 0, err } @@ -526,5 +550,5 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI return cntNodes, i, len(cntNodes), nil } } - return nil, 0, 0, errNotInContainer + return cntNodes, -1, len(cntNodes), nil } diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 663c8738..e9a7d189 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -15,7 +15,7 @@ import ( // Synchronize tries to synchronize log starting from the last stored height. func (s *Service) Synchronize(ctx context.Context, cid cid.ID, treeID string) error { - nodes, err := s.getContainerNodes(cid) + nodes, _, err := s.getContainerNodes(cid) if err != nil { return fmt.Errorf("can't get container nodes: %w", err) }