frostfs-node/cmd/frostfs-node/tree.go

107 lines
3.2 KiB
Go
Raw Normal View History

package main
import (
"context"
"errors"
"time"
treeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"github.com/TrueCloudLab/frostfs-node/pkg/core/container"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
containerClient "github.com/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"github.com/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "github.com/TrueCloudLab/frostfs-node/pkg/morph/event/container"
"github.com/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
type cnrSource struct {
// cache of raw client.
src container.Source
// raw client; no need to cache request results
// since sync is performed once in epoch and is
// expected to receive different results every
// call.
cli *containerClient.Client
}
func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
return c.src.Get(id)
}
func (c cnrSource) List() ([]cid.ID, error) {
return c.cli.List(nil)
}
func initTreeService(c *cfg) {
treeConfig := treeconfig.Tree(c.appCfg)
if !treeConfig.Enabled() {
c.log.Info("tree service is not enabled, skip initialization")
return
}
c.treeService = tree.New(
tree.WithContainerSource(cnrSource{
src: c.cfgObject.cnrSource,
cli: c.shared.cnrClient,
}),
tree.WithEACLSource(c.cfgObject.eaclSource),
tree.WithNetmapSource(c.netMapSource),
tree.WithPrivateKey(&c.key.PrivateKey),
tree.WithLogger(c.log),
tree.WithStorage(c.cfgObject.cfgLocalStorage.localStorage),
tree.WithContainerCacheSize(treeConfig.CacheSize()),
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()))
for _, srv := range c.cfgGRPC.servers {
tree.RegisterTreeServiceServer(srv, c.treeService)
}
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.treeService.Start(ctx)
}))
if d := treeConfig.SyncInterval(); d == 0 {
addNewEpochNotificationHandler(c, func(_ event.Event) {
err := c.treeService.SynchronizeAll()
if err != nil {
c.log.Error("could not synchronize Tree Service", zap.Error(err))
}
})
} else {
go func() {
tick := time.NewTicker(d)
defer tick.Stop()
for range tick.C {
err := c.treeService.SynchronizeAll()
if err != nil {
c.log.Error("could not synchronize Tree Service", zap.Error(err))
if errors.Is(err, tree.ErrShuttingDown) {
return
}
}
}
}()
}
subscribeToContainerRemoval(c, func(e event.Event) {
ev := e.(containerEvent.DeleteSuccess)
// This is executed asynchronously, so we don't care about the operation taking some time.
c.log.Debug("removing all trees for container", zap.Stringer("cid", ev.ID))
err := c.treeService.DropTree(context.Background(), ev.ID, "")
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.
c.log.Error("container removal event received, but trees weren't removed",
zap.Stringer("cid", ev.ID),
zap.String("error", err.Error()))
}
})
c.onShutdown(c.treeService.Shutdown)
}