package main import ( "context" "errors" "net" "time" treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" "google.golang.org/grpc" ) type cnrSource struct { // cache of raw client. src container.Source // raw client; no need to cache request results // since sync is performed once in epoch and is // expected to receive different results every // call. cli *containerClient.Client } func (c cnrSource) Get(id cid.ID) (*container.Container, error) { return c.src.Get(id) } func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) { return c.src.DeletionInfo(cid) } func (c cnrSource) List() ([]cid.ID, error) { return c.cli.ContainersOf(nil) } func initTreeService(c *cfg) { treeConfig := treeconfig.Tree(c.appCfg) if !treeConfig.Enabled() { c.log.Info(logs.FrostFSNodeTreeServiceIsNotEnabledSkipInitialization) return } c.treeService = tree.New( tree.WithContainerSource(cnrSource{ src: c.cfgObject.cnrSource, cli: c.shared.cnrClient, }), tree.WithFrostfsidSubjectProvider(c.shared.frostfsidClient), tree.WithEACLSource(c.cfgObject.eaclSource), tree.WithNetmapSource(c.netMapSource), tree.WithPrivateKey(&c.key.PrivateKey), tree.WithLogger(c.log), tree.WithStorage(c.cfgObject.cfgLocalStorage.localStorage), tree.WithContainerCacheSize(treeConfig.CacheSize()), tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()), tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()), tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()), tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()), tree.WithMetrics(c.metricsCollector.TreeService()), tree.WithAPERouter(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine), ) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { tree.RegisterTreeServiceServer(s, c.treeService) }) c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { c.treeService.Start(ctx) })) if d := treeConfig.SyncInterval(); d == 0 { addNewEpochNotificationHandler(c, func(_ event.Event) { err := c.treeService.SynchronizeAll() if err != nil { c.log.Error(logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err)) } }) } else { go func() { tick := time.NewTicker(d) defer tick.Stop() for range tick.C { err := c.treeService.SynchronizeAll() if err != nil { c.log.Error(logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err)) if errors.Is(err, tree.ErrShuttingDown) { return } } } }() } subscribeToContainerRemoval(c, func(e event.Event) { ev := e.(containerEvent.DeleteSuccess) // This is executed asynchronously, so we don't care about the operation taking some time. c.log.Debug(logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID)) err := c.treeService.DropTree(context.Background(), ev.ID, "") if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { // Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged. c.log.Error(logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved, zap.Stringer("cid", ev.ID), zap.String("error", err.Error())) } }) c.onShutdown(c.treeService.Shutdown) }