[#1329] tree: Sync trees when a node first time appears in a container

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-10-19 00:33:45 +03:00 committed by fyrchik
parent 217b030d20
commit e1be0180f6
6 changed files with 181 additions and 52 deletions

View file

@ -305,10 +305,6 @@ type internals struct {
workers []worker workers []worker
closers []func() closers []func()
// onlineStateHandlers are executed in a separate
// goroutine on every !ONLINE -> ONLINE state transition
onlineStateHandlers []func(context.Context)
apiVersion version.Version apiVersion version.Version
healthStatus *atomic.Int32 healthStatus *atomic.Int32
// is node under maintenance // is node under maintenance
@ -837,13 +833,6 @@ func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
// Called with nil when storage node is outside the NeoFS network map // Called with nil when storage node is outside the NeoFS network map
// (before entering the network and after leaving it). // (before entering the network and after leaving it).
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
if c.cfgNetmap.state.controlNetmapStatus() != control.NetmapStatus_ONLINE &&
ni != nil && ni.IsOnline() {
for _, h := range c.onlineStateHandlers {
go h(c.ctx)
}
}
c.cfgNetmap.state.setNodeInfo(ni) c.cfgNetmap.state.setNodeInfo(ni)
} }
@ -950,7 +939,3 @@ func (c *cfg) configWatcher(ctx context.Context) {
} }
} }
} }
func (c *cfg) addOnlineStateHandler(h func(ctx context.Context)) {
c.onlineStateHandlers = append(c.onlineStateHandlers, h)
}

View file

@ -5,16 +5,34 @@ import (
"errors" "errors"
treeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/tree" treeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/tree"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
"github.com/nspcc-dev/neofs-node/pkg/services/tree" "github.com/nspcc-dev/neofs-node/pkg/services/tree"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
type cnrSource struct {
// cache of raw client.
src container.Source
// raw client; no need to cache request results
// since sync is performed once in epoch and is
// expected to receive different results every
// call.
cli *containerClient.Client
}
func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
return c.src.Get(id)
}
func (c cnrSource) List() ([]cid.ID, error) {
return c.cli.List(nil)
}
func initTreeService(c *cfg) { func initTreeService(c *cfg) {
treeConfig := treeconfig.Tree(c.appCfg) treeConfig := treeconfig.Tree(c.appCfg)
if !treeConfig.Enabled() { if !treeConfig.Enabled() {
@ -23,7 +41,10 @@ func initTreeService(c *cfg) {
} }
c.treeService = tree.New( c.treeService = tree.New(
tree.WithContainerSource(c.cfgObject.cnrSource), tree.WithContainerSource(cnrSource{
src: c.cfgObject.cnrSource,
cli: c.shared.cnrClient,
}),
tree.WithEACLSource(c.cfgObject.eaclSource), tree.WithEACLSource(c.cfgObject.eaclSource),
tree.WithNetmapSource(c.netMapSource), tree.WithNetmapSource(c.netMapSource),
tree.WithPrivateKey(&c.key.PrivateKey), tree.WithPrivateKey(&c.key.PrivateKey),
@ -41,15 +62,12 @@ func initTreeService(c *cfg) {
c.treeService.Start(ctx) c.treeService.Start(ctx)
})) }))
syncTreeFunc := func(ctx context.Context) { addNewEpochNotificationHandler(c, func(_ event.Event) {
syncTrees(ctx, c.treeService, c.shared.cnrClient, c.log) err := c.treeService.SynchronizeAll()
if err != nil {
c.log.Error("could not synchronize Tree Service", zap.Error(err))
} }
})
if c.cfgNetmap.state.controlNetmapStatus() == control.NetmapStatus_ONLINE {
c.workers = append(c.workers, newWorkerFromFunc(syncTreeFunc))
}
c.addOnlineStateHandler(syncTreeFunc)
subscribeToContainerRemoval(c, func(e event.Event) { subscribeToContainerRemoval(c, func(e event.Event) {
ev := e.(containerEvent.DeleteSuccess) ev := e.(containerEvent.DeleteSuccess)
@ -66,26 +84,3 @@ func initTreeService(c *cfg) {
c.onShutdown(c.treeService.Shutdown) c.onShutdown(c.treeService.Shutdown)
} }
func syncTrees(ctx context.Context, treeSvc *tree.Service, cnrCli *containerClient.Client, log *logger.Logger) {
log.Info("synchronizing trees...")
ids, err := cnrCli.List(nil)
if err != nil {
log.Error("trees are not synchronized", zap.Error(err))
return
}
for _, id := range ids {
err = treeSvc.SynchronizeAllTrees(ctx, id)
if err != nil && !errors.Is(err, tree.ErrNotInContainer) {
log.Warn(
"tree synchronization failed",
zap.Stringer("cid", id),
zap.Error(err),
)
}
}
log.Info("trees have been synchronized")
}

View file

@ -8,14 +8,23 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
) )
type ContainerSource interface {
container.Source
// List must return list of all the containers in the NeoFS network
// at the moment of a call and any error that does not allow fetching
// container information.
List() ([]cid.ID, error)
}
type cfg struct { type cfg struct {
log *logger.Logger log *logger.Logger
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
rawPub []byte rawPub []byte
nmSource netmap.Source nmSource netmap.Source
cnrSource container.Source cnrSource ContainerSource
eaclSource container.EACLSource eaclSource container.EACLSource
forest pilorama.Forest forest pilorama.Forest
// replication-related parameters // replication-related parameters
@ -29,7 +38,7 @@ type Option func(*cfg)
// WithContainerSource sets a container source for a tree service. // WithContainerSource sets a container source for a tree service.
// This option is required. // This option is required.
func WithContainerSource(src container.Source) Option { func WithContainerSource(src ContainerSource) Option {
return func(c *cfg) { return func(c *cfg) {
c.cnrSource = src c.cnrSource = src
} }

View file

@ -24,6 +24,9 @@ type Service struct {
replicationTasks chan replicationTask replicationTasks chan replicationTask
closeCh chan struct{} closeCh chan struct{}
containerCache containerCache containerCache containerCache
syncChan chan struct{}
cnrMap map[cidSDK.ID]struct{}
} }
var _ TreeServiceServer = (*Service)(nil) var _ TreeServiceServer = (*Service)(nil)
@ -48,6 +51,8 @@ func New(opts ...Option) *Service {
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize) s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]struct{})
s.syncChan = make(chan struct{})
return &s return &s
} }
@ -55,6 +60,15 @@ func New(opts ...Option) *Service {
// Start starts the service. // Start starts the service.
func (s *Service) Start(ctx context.Context) { func (s *Service) Start(ctx context.Context) {
go s.replicateLoop(ctx) go s.replicateLoop(ctx)
go s.syncLoop(ctx)
select {
case <-s.closeCh:
case <-ctx.Done():
default:
// initial sync
s.syncChan <- struct{}{}
}
} }
// Shutdown shutdowns the service. // Shutdown shutdowns the service.

View file

@ -29,6 +29,22 @@ type dummyNetmapSource struct {
type dummyContainerSource map[string]*containercore.Container type dummyContainerSource map[string]*containercore.Container
func (s dummyContainerSource) List() ([]cid.ID, error) {
res := make([]cid.ID, 0, len(s))
var cnr cid.ID
for cidStr := range s {
err := cnr.DecodeString(cidStr)
if err != nil {
return nil, err
}
res = append(res, cnr)
}
return res, nil
}
func (s dummyContainerSource) Get(id cid.ID) (*containercore.Container, error) { func (s dummyContainerSource) Get(id cid.ID) (*containercore.Container, error) {
cnt, ok := s[id.String()] cnt, ok := s[id.String()]
if !ok { if !ok {

View file

@ -200,3 +200,113 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
height = newHeight height = newHeight
} }
} }
// ErrAlreadySyncing is returned when a service synchronization has already
// been started.
var ErrAlreadySyncing = errors.New("service is being synchronized")
// ErrShuttingDown is returned when the service is shitting down and could not
// accept any calls.
var ErrShuttingDown = errors.New("service is shutting down")
// SynchronizeAll forces tree service to synchronize all the trees according to
// netmap information. Must not be called before Service.Start.
// Returns ErrAlreadySyncing if synchronization has been started and blocked
// by another routine.
// Note: non-blocking operation.
func (s *Service) SynchronizeAll() error {
select {
case <-s.closeCh:
return ErrShuttingDown
default:
}
select {
case s.syncChan <- struct{}{}:
return nil
default:
return ErrAlreadySyncing
}
}
func (s *Service) syncLoop(ctx context.Context) {
for {
select {
case <-s.closeCh:
return
case <-ctx.Done():
return
case <-s.syncChan:
s.log.Debug("syncing trees...")
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error("could not fetch containers", zap.Error(err))
continue
}
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
delete(s.cnrMap, cnr)
continue
}
if pos < 0 {
// node is not included in the container.
continue
}
_, ok := s.cnrMap[cnr]
if ok {
// known container; already in sync.
delete(s.cnrMap, cnr)
newMap[cnr] = struct{}{}
} else {
// unknown container; need to sync.
cnrsToSync = append(cnrsToSync, cnr)
}
}
// sync new containers
for _, cnr := range cnrsToSync {
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err = s.SynchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
continue
}
// mark as synced
newMap[cnr] = struct{}{}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
}
// remove stored redundant trees
for cnr := range s.cnrMap {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err = s.DropTree(ctx, cnr, "") // TODO: #1940 drop all the trees here
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
}
s.cnrMap = newMap
s.log.Debug("trees have been synchronized")
}
}
}