forked from TrueCloudLab/frostfs-node
121 lines
4.1 KiB
Go
121 lines
4.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net"
|
|
"time"
|
|
|
|
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type cnrSource struct {
|
|
// cache of raw client.
|
|
src container.Source
|
|
// raw client; no need to cache request results
|
|
// since sync is performed once in epoch and is
|
|
// expected to receive different results every
|
|
// call.
|
|
cli *containerClient.Client
|
|
}
|
|
|
|
func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
|
return c.src.Get(id)
|
|
}
|
|
|
|
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
|
return c.src.DeletionInfo(cid)
|
|
}
|
|
|
|
func (c cnrSource) List() ([]cid.ID, error) {
|
|
return c.cli.ContainersOf(nil)
|
|
}
|
|
|
|
func initTreeService(c *cfg) {
|
|
treeConfig := treeconfig.Tree(c.appCfg)
|
|
if !treeConfig.Enabled() {
|
|
c.log.Info(context.Background(), logs.FrostFSNodeTreeServiceIsNotEnabledSkipInitialization)
|
|
return
|
|
}
|
|
|
|
c.treeService = tree.New(
|
|
tree.WithContainerSource(cnrSource{
|
|
src: c.cfgObject.cnrSource,
|
|
cli: c.shared.cnrClient,
|
|
}),
|
|
tree.WithFrostfsidSubjectProvider(c.shared.frostfsidClient),
|
|
tree.WithNetmapSource(c.netMapSource),
|
|
tree.WithPrivateKey(&c.key.PrivateKey),
|
|
tree.WithLogger(c.log),
|
|
tree.WithStorage(c.cfgObject.cfgLocalStorage.localStorage),
|
|
tree.WithContainerCacheSize(treeConfig.CacheSize()),
|
|
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
|
|
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
|
|
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
|
|
tree.WithSyncBatchSize(treeConfig.SyncBatchSize()),
|
|
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
|
tree.WithMetrics(c.metricsCollector.TreeService()),
|
|
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
|
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
|
|
tree.WithNetmapState(c.cfgNetmap.state),
|
|
tree.WithDialerSource(c.dialerSource),
|
|
)
|
|
|
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
|
tree.RegisterTreeServiceServer(s, c.treeService)
|
|
})
|
|
|
|
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
|
c.treeService.Start(ctx)
|
|
}))
|
|
|
|
if d := treeConfig.SyncInterval(); d == 0 {
|
|
addNewEpochNotificationHandler(c, func(ctx context.Context, _ event.Event) {
|
|
err := c.treeService.SynchronizeAll()
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
|
|
}
|
|
})
|
|
} else {
|
|
go func() {
|
|
tick := time.NewTicker(d)
|
|
defer tick.Stop()
|
|
|
|
for range tick.C {
|
|
err := c.treeService.SynchronizeAll()
|
|
if err != nil {
|
|
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotSynchronizeTreeService, zap.Error(err))
|
|
if errors.Is(err, tree.ErrShuttingDown) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
subscribeToContainerRemoval(c, func(ctx context.Context, e event.Event) {
|
|
ev := e.(containerEvent.DeleteSuccess)
|
|
|
|
// This is executed asynchronously, so we don't care about the operation taking some time.
|
|
c.log.Debug(ctx, logs.FrostFSNodeRemovingAllTreesForContainer, zap.Stringer("cid", ev.ID))
|
|
err := c.treeService.DropTree(ctx, ev.ID, "")
|
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
|
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
|
|
c.log.Error(ctx, logs.FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved,
|
|
zap.Stringer("cid", ev.ID),
|
|
zap.String("error", err.Error()))
|
|
}
|
|
})
|
|
|
|
c.onShutdown(c.treeService.Shutdown)
|
|
}
|