forked from TrueCloudLab/frostfs-node
Compare commits
2 commits
master
...
fyrchik/tr
Author | SHA1 | Date | |
---|---|---|---|
f6f5611375 | |||
92e44a45e5 |
10 changed files with 189 additions and 28 deletions
|
@ -13,6 +13,7 @@ Changelog for FrostFS Node
|
||||||
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
|
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
|
||||||
- Multiple configs support (#44)
|
- Multiple configs support (#44)
|
||||||
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
|
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
|
||||||
|
- Tree service now saves the last synchronization height which persists across restarts (#82)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
||||||
|
|
|
@ -213,6 +213,44 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
return err == nil, err
|
return err == nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||||
|
index, lst, err := e.getTreeShard(cid, treeID)
|
||||||
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = lst[index].TreeUpdateLastSyncHeight(cid, treeID, height)
|
||||||
|
if err != nil && !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||||
|
e.reportShardError(lst[index], "can't update tree synchronization height", err,
|
||||||
|
zap.Stringer("cid", cid),
|
||||||
|
zap.String("tree", treeID))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (e *StorageEngine) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
var err error
|
||||||
|
var height uint64
|
||||||
|
for _, sh := range e.sortShardsByWeight(cid) {
|
||||||
|
height, err = sh.TreeLastSyncHeight(cid, treeID)
|
||||||
|
if err != nil {
|
||||||
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
|
e.reportShardError(sh, "can't read tree synchronization height", err,
|
||||||
|
zap.Stringer("cid", cid),
|
||||||
|
zap.String("tree", treeID))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return height, err
|
||||||
|
}
|
||||||
|
return height, err
|
||||||
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
|
func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
|
||||||
lst := e.sortShardsByWeight(cid)
|
lst := e.sortShardsByWeight(cid)
|
||||||
for i, sh := range lst {
|
for i, sh := range lst {
|
||||||
|
|
|
@ -192,6 +192,46 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
return exists, err
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var syncHeightKey = []byte{'h'}
|
||||||
|
|
||||||
|
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (t *boltForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||||
|
rawHeight := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(rawHeight, height)
|
||||||
|
|
||||||
|
buck := bucketName(cid, treeID)
|
||||||
|
return t.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
treeRoot := tx.Bucket(buck)
|
||||||
|
if treeRoot == nil {
|
||||||
|
return ErrTreeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
b := treeRoot.Bucket(dataBucket)
|
||||||
|
return b.Put(syncHeightKey, rawHeight)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (t *boltForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
var height uint64
|
||||||
|
|
||||||
|
buck := bucketName(cid, treeID)
|
||||||
|
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
treeRoot := tx.Bucket(buck)
|
||||||
|
if treeRoot == nil {
|
||||||
|
return ErrTreeNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
b := treeRoot.Bucket(dataBucket)
|
||||||
|
data := b.Get(syncHeightKey)
|
||||||
|
if len(data) == 8 {
|
||||||
|
height = binary.LittleEndian.Uint64(data)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return height, err
|
||||||
|
}
|
||||||
|
|
||||||
// TreeAddByPath implements the Forest interface.
|
// TreeAddByPath implements the Forest interface.
|
||||||
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
|
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
|
||||||
if !d.checkValid() {
|
if !d.checkValid() {
|
||||||
|
|
|
@ -226,3 +226,24 @@ func (f *memoryForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
_, ok := f.treeMap[fullID]
|
_, ok := f.treeMap[fullID]
|
||||||
return ok, nil
|
return ok, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (f *memoryForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||||
|
fullID := cid.EncodeToString() + "/" + treeID
|
||||||
|
t, ok := f.treeMap[fullID]
|
||||||
|
if !ok {
|
||||||
|
return ErrTreeNotFound
|
||||||
|
}
|
||||||
|
t.syncHeight = height
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (f *memoryForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
fullID := cid.EncodeToString() + "/" + treeID
|
||||||
|
t, ok := f.treeMap[fullID]
|
||||||
|
if !ok {
|
||||||
|
return 0, ErrTreeNotFound
|
||||||
|
}
|
||||||
|
return t.syncHeight, nil
|
||||||
|
}
|
||||||
|
|
|
@ -1030,3 +1030,52 @@ func testTreeGetTrees(t *testing.T, s Forest) {
|
||||||
require.ElementsMatch(t, treeIDs[cid], trees)
|
require.ElementsMatch(t, treeIDs[cid], trees)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTreeLastSyncHeight(t *testing.T) {
|
||||||
|
for i := range providers {
|
||||||
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
|
testTreeLastSyncHeight(t, providers[i].construct(t))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testTreeLastSyncHeight(t *testing.T, f Forest) {
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
treeID := "someTree"
|
||||||
|
|
||||||
|
t.Run("ErrNotFound if no log operations are stored for a tree", func(t *testing.T) {
|
||||||
|
_, err := f.TreeLastSyncHeight(cnr, treeID)
|
||||||
|
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||||
|
|
||||||
|
err = f.TreeUpdateLastSyncHeight(cnr, treeID, 1)
|
||||||
|
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := f.TreeMove(CIDDescriptor{CID: cnr, Size: 1}, treeID, &Move{
|
||||||
|
Parent: RootID,
|
||||||
|
Child: 1,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
h, err := f.TreeLastSyncHeight(cnr, treeID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 0, h)
|
||||||
|
|
||||||
|
t.Run("separate storages for separate containers", func(t *testing.T) {
|
||||||
|
_, err := f.TreeLastSyncHeight(cidtest.ID(), treeID)
|
||||||
|
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, f.TreeUpdateLastSyncHeight(cnr, treeID, 10))
|
||||||
|
|
||||||
|
h, err = f.TreeLastSyncHeight(cnr, treeID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 10, h)
|
||||||
|
|
||||||
|
t.Run("removed correctly", func(t *testing.T) {
|
||||||
|
require.NoError(t, f.TreeDrop(cnr, treeID))
|
||||||
|
|
||||||
|
_, err := f.TreeLastSyncHeight(cnr, treeID)
|
||||||
|
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -143,8 +143,9 @@ func (s *state) findSpareID() Node {
|
||||||
|
|
||||||
// tree is a mapping from the child nodes to their parent and metadata.
|
// tree is a mapping from the child nodes to their parent and metadata.
|
||||||
type tree struct {
|
type tree struct {
|
||||||
infoMap map[Node]nodeInfo
|
syncHeight uint64
|
||||||
childMap map[Node][]Node
|
infoMap map[Node]nodeInfo
|
||||||
|
childMap map[Node][]Node
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTree() *tree {
|
func newTree() *tree {
|
||||||
|
|
|
@ -44,6 +44,10 @@ type Forest interface {
|
||||||
// TreeExists checks if a tree exists locally.
|
// TreeExists checks if a tree exists locally.
|
||||||
// If the tree is not found, false and a nil error should be returned.
|
// If the tree is not found, false and a nil error should be returned.
|
||||||
TreeExists(cid cidSDK.ID, treeID string) (bool, error)
|
TreeExists(cid cidSDK.ID, treeID string) (bool, error)
|
||||||
|
// TreeUpdateLastSyncHeight updates last log height synchronized with _all_ container nodes.
|
||||||
|
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
|
||||||
|
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
||||||
|
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ForestStorage interface {
|
type ForestStorage interface {
|
||||||
|
|
|
@ -111,3 +111,19 @@ func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeExists(cid, treeID)
|
return s.pilorama.TreeExists(cid, treeID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (s *Shard) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||||
|
if s.pilorama == nil {
|
||||||
|
return ErrPiloramaDisabled
|
||||||
|
}
|
||||||
|
return s.pilorama.TreeUpdateLastSyncHeight(cid, treeID, height)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||||
|
func (s *Shard) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
if s.pilorama == nil {
|
||||||
|
return 0, ErrPiloramaDisabled
|
||||||
|
}
|
||||||
|
return s.pilorama.TreeLastSyncHeight(cid, treeID)
|
||||||
|
}
|
||||||
|
|
|
@ -31,10 +31,8 @@ type Service struct {
|
||||||
syncChan chan struct{}
|
syncChan chan struct{}
|
||||||
syncPool *ants.Pool
|
syncPool *ants.Pool
|
||||||
|
|
||||||
// cnrMap maps contrainer and tree ID to the minimum height which was fetched from _each_ client.
|
// cnrMap contains existing (used) container IDs.
|
||||||
// This allows us to better handle split-brain scenario, because we always synchronize
|
cnrMap map[cidSDK.ID]struct{}
|
||||||
// from the last seen height. The inner map is read-only and should not be modified in-place.
|
|
||||||
cnrMap map[cidSDK.ID]map[string]uint64
|
|
||||||
// cnrMapMtx protects cnrMap
|
// cnrMapMtx protects cnrMap
|
||||||
cnrMapMtx sync.Mutex
|
cnrMapMtx sync.Mutex
|
||||||
}
|
}
|
||||||
|
@ -63,7 +61,7 @@ func New(opts ...Option) *Service {
|
||||||
s.replicateLocalCh = make(chan applyOp)
|
s.replicateLocalCh = make(chan applyOp)
|
||||||
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
|
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
|
||||||
s.containerCache.init(s.containerCacheSize)
|
s.containerCache.init(s.containerCacheSize)
|
||||||
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
|
s.cnrMap = make(map[cidSDK.ID]struct{})
|
||||||
s.syncChan = make(chan struct{})
|
s.syncChan = make(chan struct{})
|
||||||
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
|
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
|
||||||
|
|
||||||
|
|
|
@ -86,31 +86,24 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
||||||
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
|
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cnrMapMtx.Lock()
|
|
||||||
oldStatus := s.cnrMap[cid]
|
|
||||||
s.cnrMapMtx.Unlock()
|
|
||||||
|
|
||||||
syncStatus := map[string]uint64{}
|
|
||||||
for i := range treesToSync {
|
|
||||||
syncStatus[treesToSync[i]] = 0
|
|
||||||
}
|
|
||||||
for tid := range oldStatus {
|
|
||||||
if _, ok := syncStatus[tid]; ok {
|
|
||||||
syncStatus[tid] = oldStatus[tid]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tid := range treesToSync {
|
for _, tid := range treesToSync {
|
||||||
h := s.synchronizeTree(ctx, d, syncStatus[tid], tid, nodes)
|
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
|
||||||
if syncStatus[tid] < h {
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
syncStatus[tid] = h
|
s.log.Warn("could not get last synchronized height for a tree",
|
||||||
|
zap.Stringer("cid", d.CID),
|
||||||
|
zap.String("tree", tid))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
|
||||||
|
if h < newHeight {
|
||||||
|
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
|
||||||
|
s.log.Warn("could not update last synchronized height for a tree",
|
||||||
|
zap.Stringer("cid", d.CID),
|
||||||
|
zap.String("tree", tid))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cnrMapMtx.Lock()
|
|
||||||
s.cnrMap[cid] = syncStatus
|
|
||||||
s.cnrMapMtx.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue