package tree import ( "bytes" "context" "errors" "fmt" "sort" "sync" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) // Service represents tree-service capable of working with multiple // instances of CRDT trees. type Service struct { cfg cache clientCache replicateCh chan movePair replicateLocalCh chan applyOp replicationTasks chan replicationTask closeCh chan struct{} containerCache containerCache syncChan chan struct{} syncPool *ants.Pool initialSyncDone atomic.Bool // cnrMap contains existing (used) container IDs. cnrMap map[cidSDK.ID]struct{} // cnrMapMtx protects cnrMap cnrMapMtx sync.Mutex } var _ TreeServiceServer = (*Service)(nil) // New creates new tree service instance. func New(opts ...Option) *Service { var s Service s.containerCacheSize = defaultContainerCacheSize s.replicatorChannelCapacity = defaultReplicatorCapacity s.replicatorWorkerCount = defaultReplicatorWorkerCount s.replicatorTimeout = defaultReplicatorSendTimeout s.metrics = defaultMetricsRegister{} for i := range opts { opts[i](&s.cfg) } if s.log == nil { s.log = &logger.Logger{Logger: zap.NewNop()} } s.cache.init() s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateLocalCh = make(chan applyOp) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.containerCache.init(s.containerCacheSize) s.cnrMap = make(map[cidSDK.ID]struct{}) s.syncChan = make(chan struct{}) s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount) return &s } // Start starts the service. func (s *Service) Start(ctx context.Context) { go s.replicateLoop(ctx) go s.syncLoop(ctx) select { case <-s.closeCh: case <-ctx.Done(): default: // initial sync s.syncChan <- struct{}{} } } // Shutdown shutdowns the service. func (s *Service) Shutdown() { close(s.closeCh) s.syncPool.Release() } func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(b.GetContainerId()); err != nil { return nil, err } err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut) if err != nil { return nil, err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return nil, err } if pos < 0 { var resp *AddResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { resp, outErr = c.Add(ctx, req) return true }) if err != nil { return nil, err } return resp, outErr } d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: pilorama.RootID, Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())}, }) if err != nil { return nil, err } s.pushToQueue(cid, b.GetTreeId(), log) return &AddResponse{ Body: &AddResponse_Body{ NodeId: log.Child, }, }, nil } func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(b.GetContainerId()); err != nil { return nil, err } err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut) if err != nil { return nil, err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return nil, err } if pos < 0 { var resp *AddByPathResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { resp, outErr = c.AddByPath(ctx, req) return true }) if err != nil { return nil, err } return resp, outErr } meta := protoToMeta(b.GetMeta()) attr := b.GetPathAttribute() if len(attr) == 0 { attr = pilorama.AttributeFilename } d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} logs, err := s.forest.TreeAddByPath(ctx, d, b.GetTreeId(), attr, b.GetPath(), meta) if err != nil { return nil, err } for i := range logs { s.pushToQueue(cid, b.GetTreeId(), &logs[i]) } nodes := make([]uint64, len(logs)) nodes[0] = logs[len(logs)-1].Child for i, l := range logs[:len(logs)-1] { nodes[i+1] = l.Child } return &AddByPathResponse{ Body: &AddByPathResponse_Body{ Nodes: nodes, }, }, nil } func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(b.GetContainerId()); err != nil { return nil, err } err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut) if err != nil { return nil, err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return nil, err } if pos < 0 { var resp *RemoveResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { resp, outErr = c.Remove(ctx, req) return true }) if err != nil { return nil, err } return resp, outErr } if b.GetNodeId() == pilorama.RootID { return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId()) } d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{ Parent: pilorama.TrashID, Child: b.GetNodeId(), }) if err != nil { return nil, err } s.pushToQueue(cid, b.GetTreeId(), log) return new(RemoveResponse), nil } // Move applies client operation to the specified tree and pushes in queue // for replication on other nodes. func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(b.GetContainerId()); err != nil { return nil, err } err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectPut) if err != nil { return nil, err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return nil, err } if pos < 0 { var resp *MoveResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { resp, outErr = c.Move(ctx, req) return true }) if err != nil { return nil, err } return resp, outErr } if b.GetNodeId() == pilorama.RootID { return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId()) } d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} log, err := s.forest.TreeMove(ctx, d, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: b.GetNodeId(), Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())}, }) if err != nil { return nil, err } s.pushToQueue(cid, b.GetTreeId(), log) return new(MoveResponse), nil } func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(b.GetContainerId()); err != nil { return nil, err } err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet) if err != nil { return nil, err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return nil, err } if pos < 0 { var resp *GetNodeByPathResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { resp, outErr = c.GetNodeByPath(ctx, req) return true }) if err != nil { return nil, err } return resp, outErr } attr := b.GetPathAttribute() if len(attr) == 0 { attr = pilorama.AttributeFilename } nodes, err := s.forest.TreeGetByPath(ctx, cid, b.GetTreeId(), attr, b.GetPath(), b.GetLatestOnly()) if err != nil { return nil, err } info := make([]*GetNodeByPathResponse_Info, 0, len(nodes)) for _, node := range nodes { m, parent, err := s.forest.TreeGetMeta(ctx, cid, b.GetTreeId(), node) if err != nil { return nil, err } var x GetNodeByPathResponse_Info x.ParentId = parent x.NodeId = node x.Timestamp = m.Time if b.AllAttributes { x.Meta = metaToProto(m.Items) } else { for _, kv := range m.Items { for _, attr := range b.GetAttributes() { if kv.Key == attr { x.Meta = append(x.Meta, &KeyValue{ Key: kv.Key, Value: kv.Value, }) break } } } } info = append(info, &x) } return &GetNodeByPathResponse{ Body: &GetNodeByPathResponse_Body{ Nodes: info, }, }, nil } func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { if !s.initialSyncDone.Load() { return ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(b.GetContainerId()); err != nil { return err } err := s.verifyClient(req, cid, b.GetBearerToken(), acl.OpObjectGet) if err != nil { return err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return err } if pos < 0 { var cli TreeService_GetSubTreeClient var outErr error err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool { cli, outErr = c.GetSubTree(srv.Context(), req) return true }) if err != nil { return err } else if outErr != nil { return outErr } for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() { if err := srv.Send(resp); err != nil { return err } } return nil } return getSubTree(srv.Context(), srv, cid, b, s.forest) } func getSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error { // Traverse the tree in a DFS manner. Because we need to support arbitrary depth, // recursive implementation is not suitable here, so we maintain explicit stack. m, p, err := forest.TreeGetMeta(ctx, cid, b.GetTreeId(), b.GetRootId()) if err != nil { return err } stack := [][]pilorama.NodeInfo{{{ ID: b.GetRootId(), Meta: m, ParentID: p, }}} for { if len(stack) == 0 { break } else if len(stack[len(stack)-1]) == 0 { stack = stack[:len(stack)-1] continue } node := stack[len(stack)-1][0] stack[len(stack)-1] = stack[len(stack)-1][1:] err = srv.Send(&GetSubTreeResponse{ Body: &GetSubTreeResponse_Body{ NodeId: node.ID, ParentId: node.ParentID, Timestamp: node.Meta.Time, Meta: metaToProto(node.Meta.Items), }, }) if err != nil { return err } if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() { children, err := forest.TreeGetChildren(ctx, cid, b.GetTreeId(), node.ID) if err != nil { return err } children, err = sortByFilename(children, b.GetOrderBy().GetDirection()) if err != nil { return err } if len(children) != 0 { stack = append(stack, children) } } } return nil } func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Direction) ([]pilorama.NodeInfo, error) { switch d { case GetSubTreeRequest_Body_Order_None: return nodes, nil case GetSubTreeRequest_Body_Order_Asc: if len(nodes) == 0 { return nodes, nil } less := func(i, j int) bool { return bytes.Compare(nodes[i].Meta.GetAttr(pilorama.AttributeFilename), nodes[j].Meta.GetAttr(pilorama.AttributeFilename)) < 0 } sort.Slice(nodes, less) return nodes, nil default: return nil, fmt.Errorf("unsupported order direction: %s", d.String()) } } // Apply locally applies operation from the remote node to the tree. func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) { err := verifyMessage(req) if err != nil { return nil, err } var cid cidSDK.ID if err := cid.Decode(req.GetBody().GetContainerId()); err != nil { return nil, err } key := req.GetSignature().GetKey() _, pos, _, err := s.getContainerInfo(cid, key) if err != nil { return nil, err } if pos < 0 { return nil, errors.New("`Apply` request must be signed by a container node") } op := req.GetBody().GetOperation() var meta pilorama.Meta if err := meta.FromBytes(op.GetMeta()); err != nil { return nil, fmt.Errorf("can't parse meta-information: %w", err) } select { case s.replicateLocalCh <- applyOp{ treeID: req.GetBody().GetTreeId(), cid: cid, Move: pilorama.Move{ Parent: op.GetParentId(), Child: op.GetChildId(), Meta: meta, }, }: default: } return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { if !s.initialSyncDone.Load() { return ErrAlreadySyncing } b := req.GetBody() var cid cidSDK.ID if err := cid.Decode(req.GetBody().GetContainerId()); err != nil { return err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return err } if pos < 0 { var cli TreeService_GetOpLogClient var outErr error err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool { cli, outErr = c.GetOpLog(srv.Context(), req) return true }) if err != nil { return err } else if outErr != nil { return outErr } for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() { if err := srv.Send(resp); err != nil { return err } } return nil } h := b.GetHeight() lastHeight, err := s.forest.TreeHeight(srv.Context(), cid, b.GetTreeId()) if err != nil { return err } for { lm, err := s.forest.TreeGetOpLog(srv.Context(), cid, b.GetTreeId(), h) if err != nil || lm.Time == 0 || lastHeight < lm.Time { return err } err = srv.Send(&GetOpLogResponse{ Body: &GetOpLogResponse_Body{ Operation: &LogMove{ ParentId: lm.Parent, Meta: lm.Meta.Bytes(), ChildId: lm.Child, }, }, }) if err != nil { return err } h = lm.Time + 1 } } func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } var cid cidSDK.ID err := cid.Decode(req.GetBody().GetContainerId()) if err != nil { return nil, err } // just verify the signature, not ACL checks // since tree ID list is not protected like // the containers list err = verifyMessage(req) if err != nil { return nil, err } ns, pos, err := s.getContainerNodes(cid) if err != nil { return nil, err } if pos < 0 { var resp *TreeListResponse var outErr error err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { resp, outErr = c.TreeList(ctx, req) return outErr == nil }) if err != nil { return nil, err } return resp, outErr } ids, err := s.forest.TreeList(ctx, cid) if err != nil { return nil, err } return &TreeListResponse{ Body: &TreeListResponse_Body{ Ids: ids, }, }, nil } func protoToMeta(arr []*KeyValue) []pilorama.KeyValue { meta := make([]pilorama.KeyValue, len(arr)) for i, kv := range arr { if kv != nil { meta[i].Key = kv.Key meta[i].Value = kv.Value } } return meta } func metaToProto(arr []pilorama.KeyValue) []*KeyValue { meta := make([]*KeyValue, len(arr)) for i, kv := range arr { meta[i] = &KeyValue{ Key: kv.Key, Value: kv.Value, } } return meta } // getContainerInfo returns the list of container nodes, position in the container for the node // with pub key and total amount of nodes in all replicas. func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) { cntNodes, _, err := s.getContainerNodes(cid) if err != nil { return nil, 0, 0, err } for i, node := range cntNodes { if bytes.Equal(node.PublicKey(), pub) { return cntNodes, i, len(cntNodes), nil } } return cntNodes, -1, len(cntNodes), nil } func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } return new(HealthcheckResponse), nil }