frostfs-node/pkg/services/tree/service.go
Evgenii Stratonikov d18eaf060d [#957] treesvc: Implement pairing heap
With a large number of objects in a flat tree, sorting the nodes
dominates the latency of the operation. This scales nonlinearly
as O(n log n). Pairing heap has O(1) insert, and O(log n) extractMin,
which allows us to have O(n + log n) = O(n) latency on the first
operation, albeit with a slight increase in total running time.
On a real cluster with 2m objects, the latency decreased from 25s to
15s.

```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
                        │     cache     │               noparent               │
                        │    sec/op     │    sec/op     vs base                │
GetSubTree/latency-8      2349.9µ ± 19%   572.1µ ± 17%  -75.65% (p=0.000 n=10)
GetSubTree/total_time-8    70.62m ±  8%   86.02m ±  3%  +21.81% (p=0.000 n=10)
geomean                    12.88m         7.015m        -45.54%

                        │    cache     │               noparent               │
                        │     B/op     │     B/op      vs base                │
GetSubTree/latency-8      43.87Mi ± 0%   32.81Mi ± 0%  -25.22% (p=0.000 n=10)
GetSubTree/total_time-8   43.87Mi ± 0%   32.81Mi ± 0%  -25.22% (p=0.000 n=10)
geomean                   43.87Mi        32.81Mi       -25.22%

                        │    cache    │              noparent               │
                        │  allocs/op  │  allocs/op   vs base                │
GetSubTree/latency-8      400.0k ± 0%   500.0k ± 0%  +24.99% (p=0.000 n=10)
GetSubTree/total_time-8   400.0k ± 0%   500.0k ± 0%  +24.99% (p=0.000 n=10)
geomean                   400.0k        500.0k       +24.99%
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-06 14:14:49 +03:00

725 lines
16 KiB
Go

package tree
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree/heap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// Service represents tree-service capable of working with multiple
// instances of CRDT trees.
type Service struct {
cfg
cache clientCache
replicateCh chan movePair
replicateLocalCh chan applyOp
replicationTasks chan replicationTask
closeCh chan struct{}
containerCache containerCache
syncChan chan struct{}
syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap
cnrMapMtx sync.Mutex
}
var _ TreeServiceServer = (*Service)(nil)
// New creates new tree service instance.
func New(opts ...Option) *Service {
var s Service
s.containerCacheSize = defaultContainerCacheSize
s.replicatorChannelCapacity = defaultReplicatorCapacity
s.replicatorWorkerCount = defaultReplicatorWorkerCount
s.replicatorTimeout = defaultReplicatorSendTimeout
s.metrics = defaultMetricsRegister{}
for i := range opts {
opts[i](&s.cfg)
}
if s.log == nil {
s.log = &logger.Logger{Logger: zap.NewNop()}
}
s.cache.init()
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]struct{})
s.syncChan = make(chan struct{})
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
return &s
}
// Start starts the service.
func (s *Service) Start(ctx context.Context) {
go s.replicateLoop(ctx)
go s.syncLoop(ctx)
select {
case <-s.closeCh:
case <-ctx.Done():
default:
// initial sync
s.syncChan <- struct{}{}
}
}
// Shutdown shutdowns the service.
func (s *Service) Shutdown() {
close(s.closeCh)
s.syncPool.Release()
}
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
if err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *AddResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Add(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
Parent: b.GetParentId(),
Child: pilorama.RootID,
Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())},
})
if err != nil {
return nil, err
}
s.pushToQueue(cid, b.GetTreeId(), log)
return &AddResponse{
Body: &AddResponse_Body{
NodeId: log.Child,
},
}, nil
}
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
if err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *AddByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.AddByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
meta := protoToMeta(b.GetMeta())
attr := b.GetPathAttribute()
if len(attr) == 0 {
attr = pilorama.AttributeFilename
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
logs, err := s.forest.TreeAddByPath(ctx, d, b.GetTreeId(), attr, b.GetPath(), meta)
if err != nil {
return nil, err
}
for i := range logs {
s.pushToQueue(cid, b.GetTreeId(), &logs[i])
}
nodes := make([]uint64, len(logs))
nodes[0] = logs[len(logs)-1].Child
for i, l := range logs[:len(logs)-1] {
nodes[i+1] = l.Child
}
return &AddByPathResponse{
Body: &AddByPathResponse_Body{
Nodes: nodes,
},
}, nil
}
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
if err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *RemoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Remove(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
if b.GetNodeId() == pilorama.RootID {
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
Parent: pilorama.TrashID,
Child: b.GetNodeId(),
})
if err != nil {
return nil, err
}
s.pushToQueue(cid, b.GetTreeId(), log)
return new(RemoveResponse), nil
}
// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
if err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *MoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Move(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
if b.GetNodeId() == pilorama.RootID {
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
Parent: b.GetParentId(),
Child: b.GetNodeId(),
Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())},
})
if err != nil {
return nil, err
}
s.pushToQueue(cid, b.GetTreeId(), log)
return new(MoveResponse), nil
}
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return nil, err
}
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet)
if err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *GetNodeByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.GetNodeByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
attr := b.GetPathAttribute()
if len(attr) == 0 {
attr = pilorama.AttributeFilename
}
nodes, err := s.forest.TreeGetByPath(ctx, cid, b.GetTreeId(), attr, b.GetPath(), b.GetLatestOnly())
if err != nil {
return nil, err
}
info := make([]*GetNodeByPathResponse_Info, 0, len(nodes))
for _, node := range nodes {
m, parent, err := s.forest.TreeGetMeta(ctx, cid, b.GetTreeId(), node)
if err != nil {
return nil, err
}
var x GetNodeByPathResponse_Info
x.ParentId = parent
x.NodeId = node
x.Timestamp = m.Time
if b.GetAllAttributes() {
x.Meta = metaToProto(m.Items)
} else {
var metaValue []*KeyValue
for _, kv := range m.Items {
for _, attr := range b.GetAttributes() {
if kv.Key == attr {
metaValue = append(metaValue, &KeyValue{
Key: kv.Key,
Value: kv.Value,
})
break
}
}
}
x.Meta = metaValue
}
info = append(info, &x)
}
return &GetNodeByPathResponse{
Body: &GetNodeByPathResponse_Body{
Nodes: info,
},
}, nil
}
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(b.GetContainerId()); err != nil {
return err
}
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet)
if err != nil {
return err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return err
}
if pos < 0 {
var cli TreeService_GetSubTreeClient
var outErr error
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
cli, outErr = c.GetSubTree(srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
}
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
return nil
}
return getSubTree(srv.Context(), srv, cid, b, s.forest)
}
type Heap interface {
Insert(...pilorama.NodeInfo)
IsEmpty() bool
ExtractMin() pilorama.NodeInfo
}
func makeHeap(ordered bool) Heap {
if ordered {
return heap.NewPairing()
}
return heap.NewUnorderedSlice()
}
func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
ordered, err := needOrder(b.GetOrderBy().GetDirection())
if err != nil {
return err
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
// recursive implementation is not suitable here, so we maintain explicit stack.
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())
if err != nil {
return err
}
stack := []Heap{makeHeap(ordered)}
stack[0].Insert(pilorama.NodeInfo{
ID: b.GetRootId(),
Meta: m,
ParentID: p,
})
for {
if len(stack) == 0 {
break
} else if stack[len(stack)-1].IsEmpty() {
stack = stack[:len(stack)-1]
continue
}
node := stack[len(stack)-1].ExtractMin()
err = srv.Send(&GetSubTreeResponse{
Body: &GetSubTreeResponse_Body{
NodeId: node.ID,
ParentId: node.ParentID,
Timestamp: node.Meta.Time,
Meta: metaToProto(node.Meta.Items),
},
})
if err != nil {
return err
}
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
children, err := forest.TreeGetChildren(ctx, cid, b.GetTreeId(), node.ID)
if err != nil {
return err
}
if len(children) != 0 {
h := makeHeap(ordered)
h.Insert(children...)
stack = append(stack, h)
}
}
}
return nil
}
func needOrder(d GetSubTreeRequest_Body_Order_Direction) (bool, error) {
switch d {
case GetSubTreeRequest_Body_Order_None:
return false, nil
case GetSubTreeRequest_Body_Order_Asc:
return true, nil
default:
return false, fmt.Errorf("unsupported order direction: %s", d.String())
}
}
// Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) {
err := verifyMessage(req)
if err != nil {
return nil, err
}
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return nil, err
}
key := req.GetSignature().GetKey()
_, pos, _, err := s.getContainerInfo(cid, key)
if err != nil {
return nil, err
}
if pos < 0 {
return nil, errors.New("`Apply` request must be signed by a container node")
}
op := req.GetBody().GetOperation()
var meta pilorama.Meta
if err := meta.FromBytes(op.GetMeta()); err != nil {
return nil, fmt.Errorf("can't parse meta-information: %w", err)
}
select {
case s.replicateLocalCh <- applyOp{
treeID: req.GetBody().GetTreeId(),
cid: cid,
Move: pilorama.Move{
Parent: op.GetParentId(),
Child: op.GetChildId(),
Meta: meta,
},
}:
default:
}
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
return err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return err
}
if pos < 0 {
var cli TreeService_GetOpLogClient
var outErr error
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
cli, outErr = c.GetOpLog(srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
}
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
return nil
}
h := b.GetHeight()
lastHeight, err := s.forest.TreeHeight(srv.Context(), cid, b.GetTreeId())
if err != nil {
return err
}
for {
lm, err := s.forest.TreeGetOpLog(srv.Context(), cid, b.GetTreeId(), h)
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
return err
}
err = srv.Send(&GetOpLogResponse{
Body: &GetOpLogResponse_Body{
Operation: &LogMove{
ParentId: lm.Parent,
Meta: lm.Meta.Bytes(),
ChildId: lm.Child,
},
},
})
if err != nil {
return err
}
h = lm.Time + 1
}
}
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId())
if err != nil {
return nil, err
}
// just verify the signature, not ACL checks
// since tree ID list is not protected like
// the containers list
err = verifyMessage(req)
if err != nil {
return nil, err
}
ns, pos, err := s.getContainerNodes(cid)
if err != nil {
return nil, err
}
if pos < 0 {
var resp *TreeListResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
return outErr == nil
})
if err != nil {
return nil, err
}
return resp, outErr
}
ids, err := s.forest.TreeList(ctx, cid)
if err != nil {
return nil, err
}
return &TreeListResponse{
Body: &TreeListResponse_Body{
Ids: ids,
},
}, nil
}
func protoToMeta(arr []*KeyValue) []pilorama.KeyValue {
meta := make([]pilorama.KeyValue, len(arr))
for i, kv := range arr {
if kv != nil {
meta[i].Key = kv.GetKey()
meta[i].Value = kv.GetValue()
}
}
return meta
}
func metaToProto(arr []pilorama.KeyValue) []*KeyValue {
meta := make([]*KeyValue, len(arr))
for i, kv := range arr {
meta[i] = &KeyValue{
Key: kv.Key,
Value: kv.Value,
}
}
return meta
}
// getContainerInfo returns the list of container nodes, position in the container for the node
// with pub key and total amount of nodes in all replicas.
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
cntNodes, _, err := s.getContainerNodes(cid)
if err != nil {
return nil, 0, 0, err
}
for i, node := range cntNodes {
if bytes.Equal(node.PublicKey(), pub) {
return cntNodes, i, len(cntNodes), nil
}
}
return cntNodes, -1, len(cntNodes), nil
}
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil
}