2022-04-22 13:30:20 +00:00
|
|
|
package tree
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
2023-07-11 08:39:17 +00:00
|
|
|
"sort"
|
2022-12-19 15:53:03 +00:00
|
|
|
"sync"
|
2023-06-13 08:43:25 +00:00
|
|
|
"sync/atomic"
|
2022-04-22 13:30:20 +00:00
|
|
|
|
2023-03-07 13:38:26 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
|
|
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
2022-12-12 11:49:40 +00:00
|
|
|
"github.com/panjf2000/ants/v2"
|
2022-04-22 13:30:20 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Service represents tree-service capable of working with multiple
|
|
|
|
// instances of CRDT trees.
|
|
|
|
type Service struct {
|
|
|
|
cfg
|
|
|
|
|
2022-05-28 13:48:25 +00:00
|
|
|
cache clientCache
|
|
|
|
replicateCh chan movePair
|
2022-07-26 10:37:12 +00:00
|
|
|
replicateLocalCh chan applyOp
|
2022-05-28 13:48:25 +00:00
|
|
|
replicationTasks chan replicationTask
|
|
|
|
closeCh chan struct{}
|
2022-05-30 17:46:29 +00:00
|
|
|
containerCache containerCache
|
2022-10-18 21:33:45 +00:00
|
|
|
|
|
|
|
syncChan chan struct{}
|
2022-12-12 11:49:40 +00:00
|
|
|
syncPool *ants.Pool
|
2022-12-19 15:53:03 +00:00
|
|
|
|
2023-06-13 08:43:25 +00:00
|
|
|
initialSyncDone atomic.Bool
|
|
|
|
|
2023-01-25 12:44:44 +00:00
|
|
|
// cnrMap contains existing (used) container IDs.
|
|
|
|
cnrMap map[cidSDK.ID]struct{}
|
2022-12-19 15:53:03 +00:00
|
|
|
// cnrMapMtx protects cnrMap
|
|
|
|
cnrMapMtx sync.Mutex
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ TreeServiceServer = (*Service)(nil)
|
|
|
|
|
|
|
|
// New creates new tree service instance.
|
|
|
|
func New(opts ...Option) *Service {
|
|
|
|
var s Service
|
2022-07-19 08:03:13 +00:00
|
|
|
s.containerCacheSize = defaultContainerCacheSize
|
|
|
|
s.replicatorChannelCapacity = defaultReplicatorCapacity
|
|
|
|
s.replicatorWorkerCount = defaultReplicatorWorkerCount
|
2022-12-14 16:28:44 +00:00
|
|
|
s.replicatorTimeout = defaultReplicatorSendTimeout
|
2023-05-24 07:01:50 +00:00
|
|
|
s.metrics = defaultMetricsRegister{}
|
2022-07-19 08:03:13 +00:00
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
for i := range opts {
|
|
|
|
opts[i](&s.cfg)
|
|
|
|
}
|
|
|
|
|
|
|
|
if s.log == nil {
|
2022-09-28 07:41:01 +00:00
|
|
|
s.log = &logger.Logger{Logger: zap.NewNop()}
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
|
2022-05-28 09:27:55 +00:00
|
|
|
s.cache.init()
|
2022-04-22 13:30:20 +00:00
|
|
|
s.closeCh = make(chan struct{})
|
2022-07-19 08:03:13 +00:00
|
|
|
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
2022-07-26 10:37:12 +00:00
|
|
|
s.replicateLocalCh = make(chan applyOp)
|
2022-07-19 08:03:13 +00:00
|
|
|
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
|
|
|
|
s.containerCache.init(s.containerCacheSize)
|
2023-01-25 12:44:44 +00:00
|
|
|
s.cnrMap = make(map[cidSDK.ID]struct{})
|
2022-10-18 21:33:45 +00:00
|
|
|
s.syncChan = make(chan struct{})
|
2022-12-12 11:49:40 +00:00
|
|
|
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
|
2022-04-22 13:30:20 +00:00
|
|
|
|
|
|
|
return &s
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start starts the service.
|
|
|
|
func (s *Service) Start(ctx context.Context) {
|
|
|
|
go s.replicateLoop(ctx)
|
2022-10-18 21:33:45 +00:00
|
|
|
go s.syncLoop(ctx)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
case <-ctx.Done():
|
|
|
|
default:
|
|
|
|
// initial sync
|
|
|
|
s.syncChan <- struct{}{}
|
|
|
|
}
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown shutdowns the service.
|
|
|
|
func (s *Service) Shutdown() {
|
|
|
|
close(s.closeCh)
|
2022-12-12 11:49:40 +00:00
|
|
|
s.syncPool.Release()
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
|
2022-05-24 11:23:27 +00:00
|
|
|
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(b.GetContainerId()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-09-08 12:44:27 +00:00
|
|
|
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var resp *AddResponse
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
|
|
resp, outErr = c.Add(ctx, req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
return resp, outErr
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
2023-04-13 12:36:20 +00:00
|
|
|
log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
|
2022-04-22 13:30:20 +00:00
|
|
|
Parent: b.GetParentId(),
|
|
|
|
Child: pilorama.RootID,
|
2022-04-29 10:06:10 +00:00
|
|
|
Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())},
|
2022-04-22 13:30:20 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.pushToQueue(cid, b.GetTreeId(), log)
|
|
|
|
return &AddResponse{
|
|
|
|
Body: &AddResponse_Body{
|
|
|
|
NodeId: log.Child,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2022-05-24 11:23:27 +00:00
|
|
|
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(b.GetContainerId()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-09-08 12:44:27 +00:00
|
|
|
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var resp *AddByPathResponse
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
|
|
resp, outErr = c.AddByPath(ctx, req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
return resp, outErr
|
|
|
|
}
|
|
|
|
|
2022-04-29 10:06:10 +00:00
|
|
|
meta := protoToMeta(b.GetMeta())
|
2022-04-22 13:30:20 +00:00
|
|
|
|
|
|
|
attr := b.GetPathAttribute()
|
|
|
|
if len(attr) == 0 {
|
|
|
|
attr = pilorama.AttributeFilename
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
2023-04-13 12:36:20 +00:00
|
|
|
logs, err := s.forest.TreeAddByPath(ctx, d, b.GetTreeId(), attr, b.GetPath(), meta)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range logs {
|
|
|
|
s.pushToQueue(cid, b.GetTreeId(), &logs[i])
|
|
|
|
}
|
|
|
|
|
|
|
|
nodes := make([]uint64, len(logs))
|
|
|
|
nodes[0] = logs[len(logs)-1].Child
|
|
|
|
for i, l := range logs[:len(logs)-1] {
|
|
|
|
nodes[i+1] = l.Child
|
|
|
|
}
|
|
|
|
|
|
|
|
return &AddByPathResponse{
|
|
|
|
Body: &AddByPathResponse_Body{
|
|
|
|
Nodes: nodes,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2022-05-24 11:23:27 +00:00
|
|
|
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(b.GetContainerId()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-09-08 12:44:27 +00:00
|
|
|
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var resp *RemoveResponse
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
|
|
resp, outErr = c.Remove(ctx, req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
return resp, outErr
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
if b.GetNodeId() == pilorama.RootID {
|
|
|
|
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
2023-04-13 12:36:20 +00:00
|
|
|
log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
|
2022-04-22 13:30:20 +00:00
|
|
|
Parent: pilorama.TrashID,
|
|
|
|
Child: b.GetNodeId(),
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.pushToQueue(cid, b.GetTreeId(), log)
|
|
|
|
return new(RemoveResponse), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Move applies client operation to the specified tree and pushes in queue
|
|
|
|
// for replication on other nodes.
|
2022-05-24 11:23:27 +00:00
|
|
|
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(b.GetContainerId()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-09-08 12:44:27 +00:00
|
|
|
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var resp *MoveResponse
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
|
|
resp, outErr = c.Move(ctx, req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
return resp, outErr
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
if b.GetNodeId() == pilorama.RootID {
|
|
|
|
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
2023-04-13 12:36:20 +00:00
|
|
|
log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{
|
2022-04-22 13:30:20 +00:00
|
|
|
Parent: b.GetParentId(),
|
|
|
|
Child: b.GetNodeId(),
|
2022-04-29 10:06:10 +00:00
|
|
|
Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())},
|
2022-04-22 13:30:20 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.pushToQueue(cid, b.GetTreeId(), log)
|
|
|
|
return new(MoveResponse), nil
|
|
|
|
}
|
|
|
|
|
2022-05-24 11:23:27 +00:00
|
|
|
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(b.GetContainerId()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-09-08 12:44:27 +00:00
|
|
|
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet)
|
2022-05-05 11:00:30 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var resp *GetNodeByPathResponse
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
|
|
resp, outErr = c.GetNodeByPath(ctx, req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
return resp, outErr
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
attr := b.GetPathAttribute()
|
|
|
|
if len(attr) == 0 {
|
|
|
|
attr = pilorama.AttributeFilename
|
|
|
|
}
|
|
|
|
|
2023-04-13 12:36:20 +00:00
|
|
|
nodes, err := s.forest.TreeGetByPath(ctx, cid, b.GetTreeId(), attr, b.GetPath(), b.GetLatestOnly())
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
info := make([]*GetNodeByPathResponse_Info, 0, len(nodes))
|
|
|
|
for _, node := range nodes {
|
2023-04-13 12:36:20 +00:00
|
|
|
m, parent, err := s.forest.TreeGetMeta(ctx, cid, b.GetTreeId(), node)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var x GetNodeByPathResponse_Info
|
2022-09-05 13:46:56 +00:00
|
|
|
x.ParentId = parent
|
2022-04-22 13:30:20 +00:00
|
|
|
x.NodeId = node
|
|
|
|
x.Timestamp = m.Time
|
2023-12-11 10:18:34 +00:00
|
|
|
if b.GetAllAttributes() {
|
2022-04-29 10:06:10 +00:00
|
|
|
x.Meta = metaToProto(m.Items)
|
|
|
|
} else {
|
2023-12-11 10:18:34 +00:00
|
|
|
var metaValue []*KeyValue
|
2022-04-29 10:06:10 +00:00
|
|
|
for _, kv := range m.Items {
|
2022-04-22 13:30:20 +00:00
|
|
|
for _, attr := range b.GetAttributes() {
|
|
|
|
if kv.Key == attr {
|
2023-12-11 10:18:34 +00:00
|
|
|
metaValue = append(metaValue, &KeyValue{
|
2022-04-29 10:06:10 +00:00
|
|
|
Key: kv.Key,
|
|
|
|
Value: kv.Value,
|
|
|
|
})
|
2022-04-22 13:30:20 +00:00
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-12-11 10:18:34 +00:00
|
|
|
x.Meta = metaValue
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
info = append(info, &x)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &GetNodeByPathResponse{
|
|
|
|
Body: &GetNodeByPathResponse_Body{
|
|
|
|
Nodes: info,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2022-04-29 10:06:10 +00:00
|
|
|
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-04-29 10:06:10 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(b.GetContainerId()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-09-08 12:44:27 +00:00
|
|
|
err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet)
|
2022-05-05 11:00:30 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var cli TreeService_GetSubTreeClient
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
|
|
|
cli, outErr = c.GetSubTree(srv.Context(), req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
} else if outErr != nil {
|
|
|
|
return outErr
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
|
|
|
|
if err := srv.Send(resp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2022-05-27 12:55:02 +00:00
|
|
|
return nil
|
2022-05-24 11:23:27 +00:00
|
|
|
}
|
|
|
|
|
2023-04-13 12:36:20 +00:00
|
|
|
return getSubTree(srv.Context(), srv, cid, b, s.forest)
|
2022-09-05 11:00:23 +00:00
|
|
|
}
|
2022-04-29 10:06:10 +00:00
|
|
|
|
2024-03-28 12:53:26 +00:00
|
|
|
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
|
|
|
|
const batchSize = 1000
|
|
|
|
|
|
|
|
type stackItem struct {
|
|
|
|
values []pilorama.NodeInfo
|
|
|
|
parent pilorama.Node
|
2024-04-04 07:40:21 +00:00
|
|
|
last *string
|
2024-03-28 12:53:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
|
|
|
|
// recursive implementation is not suitable here, so we maintain explicit stack.
|
|
|
|
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
stack := []stackItem{{
|
2024-04-02 07:41:38 +00:00
|
|
|
values: []pilorama.NodeInfo{{
|
|
|
|
ID: b.GetRootId(),
|
|
|
|
Meta: m,
|
|
|
|
ParentID: p,
|
|
|
|
}},
|
|
|
|
parent: p,
|
2024-03-28 12:53:26 +00:00
|
|
|
}}
|
|
|
|
|
|
|
|
for {
|
|
|
|
if len(stack) == 0 {
|
|
|
|
break
|
|
|
|
} else if item := &stack[len(stack)-1]; len(item.values) == 0 {
|
2024-04-02 07:41:38 +00:00
|
|
|
if len(stack) == 1 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2024-03-28 12:53:26 +00:00
|
|
|
nodes, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), item.parent, item.last, batchSize)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
item.values = nodes
|
|
|
|
item.last = last
|
|
|
|
|
|
|
|
if len(nodes) == 0 {
|
|
|
|
stack = stack[:len(stack)-1]
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
node := stack[len(stack)-1].values[0]
|
|
|
|
stack[len(stack)-1].values = stack[len(stack)-1].values[1:]
|
|
|
|
|
|
|
|
err = srv.Send(&GetSubTreeResponse{
|
|
|
|
Body: &GetSubTreeResponse_Body{
|
|
|
|
NodeId: node.ID,
|
|
|
|
ParentId: node.ParentID,
|
|
|
|
Timestamp: node.Meta.Time,
|
|
|
|
Meta: metaToProto(node.Meta.Items),
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
|
2024-04-04 07:40:21 +00:00
|
|
|
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, nil, batchSize)
|
2024-03-28 12:53:26 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(children) != 0 {
|
|
|
|
stack = append(stack, stackItem{
|
|
|
|
values: children,
|
|
|
|
parent: node.ID,
|
|
|
|
last: last,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-04-13 12:36:20 +00:00
|
|
|
func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
|
2024-03-28 12:53:26 +00:00
|
|
|
if b.GetOrderBy().GetDirection() == GetSubTreeRequest_Body_Order_Asc {
|
|
|
|
return getSortedSubTree(ctx, srv, cid, b, forest)
|
|
|
|
}
|
|
|
|
|
2022-09-05 11:00:23 +00:00
|
|
|
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
|
|
|
|
// recursive implementation is not suitable here, so we maintain explicit stack.
|
2023-07-11 08:39:17 +00:00
|
|
|
m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
stack := [][]pilorama.NodeInfo{{{
|
|
|
|
ID: b.GetRootId(),
|
|
|
|
Meta: m,
|
|
|
|
ParentID: p,
|
|
|
|
}}}
|
2022-09-05 11:00:23 +00:00
|
|
|
|
|
|
|
for {
|
|
|
|
if len(stack) == 0 {
|
|
|
|
break
|
|
|
|
} else if len(stack[len(stack)-1]) == 0 {
|
|
|
|
stack = stack[:len(stack)-1]
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-07-11 08:39:17 +00:00
|
|
|
node := stack[len(stack)-1][0]
|
2022-09-05 11:00:23 +00:00
|
|
|
stack[len(stack)-1] = stack[len(stack)-1][1:]
|
|
|
|
|
|
|
|
err = srv.Send(&GetSubTreeResponse{
|
|
|
|
Body: &GetSubTreeResponse_Body{
|
2023-07-11 08:39:17 +00:00
|
|
|
NodeId: node.ID,
|
|
|
|
ParentId: node.ParentID,
|
|
|
|
Timestamp: node.Meta.Time,
|
|
|
|
Meta: metaToProto(node.Meta.Items),
|
2022-09-05 11:00:23 +00:00
|
|
|
},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
|
2023-07-11 08:39:17 +00:00
|
|
|
children, err := forest.TreeGetChildren(ctx, cid, b.GetTreeId(), node.ID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
children, err = sortByFilename(children, b.GetOrderBy().GetDirection())
|
2022-04-29 10:06:10 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-09-05 11:00:23 +00:00
|
|
|
if len(children) != 0 {
|
|
|
|
stack = append(stack, children)
|
2022-04-29 10:06:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
|
2023-07-11 08:39:17 +00:00
|
|
|
func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Direction) ([]pilorama.NodeInfo, error) {
|
|
|
|
switch d {
|
|
|
|
case GetSubTreeRequest_Body_Order_None:
|
|
|
|
return nodes, nil
|
|
|
|
case GetSubTreeRequest_Body_Order_Asc:
|
|
|
|
if len(nodes) == 0 {
|
|
|
|
return nodes, nil
|
|
|
|
}
|
|
|
|
less := func(i, j int) bool {
|
|
|
|
return bytes.Compare(nodes[i].Meta.GetAttr(pilorama.AttributeFilename), nodes[j].Meta.GetAttr(pilorama.AttributeFilename)) < 0
|
|
|
|
}
|
|
|
|
sort.Slice(nodes, less)
|
|
|
|
return nodes, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported order direction: %s", d.String())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-22 13:30:20 +00:00
|
|
|
// Apply locally applies operation from the remote node to the tree.
|
2023-04-26 08:24:40 +00:00
|
|
|
func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
2022-05-05 11:00:30 +00:00
|
|
|
err := verifyMessage(req)
|
2022-04-22 13:30:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
key := req.GetSignature().GetKey()
|
|
|
|
|
2023-04-26 08:24:40 +00:00
|
|
|
_, pos, _, err := s.getContainerInfo(cid, key)
|
2022-05-30 17:46:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-04-22 13:30:20 +00:00
|
|
|
return nil, errors.New("`Apply` request must be signed by a container node")
|
|
|
|
}
|
|
|
|
|
|
|
|
op := req.GetBody().GetOperation()
|
|
|
|
|
|
|
|
var meta pilorama.Meta
|
|
|
|
if err := meta.FromBytes(op.GetMeta()); err != nil {
|
|
|
|
return nil, fmt.Errorf("can't parse meta-information: %w", err)
|
|
|
|
}
|
|
|
|
|
2023-01-20 15:04:20 +00:00
|
|
|
select {
|
|
|
|
case s.replicateLocalCh <- applyOp{
|
2023-03-21 12:43:12 +00:00
|
|
|
treeID: req.GetBody().GetTreeId(),
|
|
|
|
cid: cid,
|
2022-07-26 10:37:12 +00:00
|
|
|
Move: pilorama.Move{
|
|
|
|
Parent: op.GetParentId(),
|
|
|
|
Child: op.GetChildId(),
|
|
|
|
Meta: meta,
|
|
|
|
},
|
2023-01-20 15:04:20 +00:00
|
|
|
}:
|
|
|
|
default:
|
2022-07-26 10:37:12 +00:00
|
|
|
}
|
|
|
|
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
|
2022-05-11 13:29:04 +00:00
|
|
|
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-05-11 13:29:04 +00:00
|
|
|
b := req.GetBody()
|
|
|
|
|
|
|
|
var cid cidSDK.ID
|
|
|
|
if err := cid.Decode(req.GetBody().GetContainerId()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-05-30 17:46:29 +00:00
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
2022-05-27 12:55:02 +00:00
|
|
|
var cli TreeService_GetOpLogClient
|
|
|
|
var outErr error
|
|
|
|
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
|
|
|
cli, outErr = c.GetOpLog(srv.Context(), req)
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
} else if outErr != nil {
|
|
|
|
return outErr
|
|
|
|
}
|
2022-05-24 11:23:27 +00:00
|
|
|
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
|
|
|
|
if err := srv.Send(resp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2022-05-27 12:55:02 +00:00
|
|
|
return nil
|
2022-05-24 11:23:27 +00:00
|
|
|
}
|
|
|
|
|
2022-05-11 13:29:04 +00:00
|
|
|
h := b.GetHeight()
|
2023-06-13 08:32:01 +00:00
|
|
|
lastHeight, err := s.forest.TreeHeight(srv.Context(), cid, b.GetTreeId())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-05-11 13:29:04 +00:00
|
|
|
for {
|
2023-04-13 12:36:20 +00:00
|
|
|
lm, err := s.forest.TreeGetOpLog(srv.Context(), cid, b.GetTreeId(), h)
|
2023-06-13 08:32:01 +00:00
|
|
|
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
|
2022-05-11 13:29:04 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = srv.Send(&GetOpLogResponse{
|
|
|
|
Body: &GetOpLogResponse_Body{
|
|
|
|
Operation: &LogMove{
|
|
|
|
ParentId: lm.Parent,
|
|
|
|
Meta: lm.Meta.Bytes(),
|
|
|
|
ChildId: lm.Child,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
h = lm.Time + 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-18 13:03:03 +00:00
|
|
|
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-10-18 13:03:03 +00:00
|
|
|
var cid cidSDK.ID
|
|
|
|
|
|
|
|
err := cid.Decode(req.GetBody().GetContainerId())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// just verify the signature, not ACL checks
|
|
|
|
// since tree ID list is not protected like
|
|
|
|
// the containers list
|
|
|
|
err = verifyMessage(req)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ns, pos, err := s.getContainerNodes(cid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pos < 0 {
|
|
|
|
var resp *TreeListResponse
|
|
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
|
|
resp, outErr = c.TreeList(ctx, req)
|
|
|
|
return outErr == nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return resp, outErr
|
|
|
|
}
|
|
|
|
|
2023-04-13 12:36:20 +00:00
|
|
|
ids, err := s.forest.TreeList(ctx, cid)
|
2022-10-18 13:03:03 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &TreeListResponse{
|
|
|
|
Body: &TreeListResponse_Body{
|
|
|
|
Ids: ids,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2022-04-29 10:06:10 +00:00
|
|
|
func protoToMeta(arr []*KeyValue) []pilorama.KeyValue {
|
2022-04-22 13:30:20 +00:00
|
|
|
meta := make([]pilorama.KeyValue, len(arr))
|
|
|
|
for i, kv := range arr {
|
2022-04-29 10:06:10 +00:00
|
|
|
if kv != nil {
|
2023-12-11 10:18:34 +00:00
|
|
|
meta[i].Key = kv.GetKey()
|
|
|
|
meta[i].Value = kv.GetValue()
|
2022-04-29 10:06:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return meta
|
|
|
|
}
|
|
|
|
|
|
|
|
func metaToProto(arr []pilorama.KeyValue) []*KeyValue {
|
|
|
|
meta := make([]*KeyValue, len(arr))
|
|
|
|
for i, kv := range arr {
|
|
|
|
meta[i] = &KeyValue{
|
|
|
|
Key: kv.Key,
|
|
|
|
Value: kv.Value,
|
|
|
|
}
|
2022-04-22 13:30:20 +00:00
|
|
|
}
|
|
|
|
return meta
|
|
|
|
}
|
2022-05-27 12:55:02 +00:00
|
|
|
|
|
|
|
// getContainerInfo returns the list of container nodes, position in the container for the node
|
|
|
|
// with pub key and total amount of nodes in all replicas.
|
2023-04-26 08:24:40 +00:00
|
|
|
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
|
2022-05-30 17:46:29 +00:00
|
|
|
cntNodes, _, err := s.getContainerNodes(cid)
|
2022-05-27 12:55:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, 0, 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, node := range cntNodes {
|
|
|
|
if bytes.Equal(node.PublicKey(), pub) {
|
|
|
|
return cntNodes, i, len(cntNodes), nil
|
|
|
|
}
|
|
|
|
}
|
2022-05-30 17:46:29 +00:00
|
|
|
return cntNodes, -1, len(cntNodes), nil
|
2022-05-27 12:55:02 +00:00
|
|
|
}
|
2022-10-10 11:33:17 +00:00
|
|
|
|
|
|
|
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
2023-06-13 08:43:25 +00:00
|
|
|
if !s.initialSyncDone.Load() {
|
|
|
|
return nil, ErrAlreadySyncing
|
|
|
|
}
|
|
|
|
|
2022-10-10 11:33:17 +00:00
|
|
|
return new(HealthcheckResponse), nil
|
|
|
|
}
|