Store tree service sync height in the metabase #82

Merged
fyrchik merged 2 commits from fyrchik/tree-service-store-sync-height into master 2023-03-13 11:25:45 +00:00
10 changed files with 189 additions and 28 deletions

View File

@ -13,6 +13,7 @@ Changelog for FrostFS Node
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
- Multiple configs support (#44)
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
- Tree service now saves the last synchronization height which persists across restarts (#82)

"82"? we will have conflicts some day then

"82"? we will have conflicts some day then

What do you mean? This PR is #82.

What do you mean? This PR is #82.

what we gonna do when #1868 PR is open in that repo?

what we gonna do when `#1868` PR is open in that repo?

We will discuss, but it is unrelated to this PR.

We will discuss, but it is unrelated to this PR.

it is unrelated to this PR

sure but i would discuss it as early as possible since we are already filling CHANGELOG with conflicting lines (adjacent rows belong to the different lists, what is that file for then?)

> it is unrelated to this PR sure but i would discuss it as early as possible since we are already filling CHANGELOG with conflicting lines (adjacent rows belong to the different lists, what is that file for then?)
### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects

View File

@ -213,6 +213,44 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeUpdateLastSyncHeight(cid, treeID, height)
if err != nil && !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't update tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
}
return err
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
var err error
var height uint64
for _, sh := range e.sortShardsByWeight(cid) {
height, err = sh.TreeLastSyncHeight(cid, treeID)
carpawell marked this conversation as resolved

why not getTreeShard?

why not `getTreeShard`?

Mostly because this is a general rule here.
The reason is that getTreeShard performs TreeExists just to find the shard and is really needed only for modifying operations.
TreeLastSyncHeight is cheap, so we can safely use it here without calling TreeExists for each shard in getTreeShard.

Mostly because this is a general rule here. The reason is that `getTreeShard` performs `TreeExists` just to find the shard and is really needed only for modifying operations. `TreeLastSyncHeight` is cheap, so we can safely use it here without calling `TreeExists` for each shard in `getTreeShard`.
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't read tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
}
continue
}
return height, err
}
return height, err
}
func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
lst := e.sortShardsByWeight(cid)
for i, sh := range lst {

View File

@ -192,6 +192,46 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return exists, err
}
var syncHeightKey = []byte{'h'}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
rawHeight := make([]byte, 8)
binary.LittleEndian.PutUint64(rawHeight, height)
buck := bucketName(cid, treeID)
return t.db.Batch(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck)
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
return b.Put(syncHeightKey, rawHeight)
})
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
var height uint64
buck := bucketName(cid, treeID)
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck)
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
data := b.Get(syncHeightKey)
if len(data) == 8 {
height = binary.LittleEndian.Uint64(data)
}
return nil
})
return height, err
}
// TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
if !d.checkValid() {

View File

@ -226,3 +226,24 @@ func (f *memoryForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
_, ok := f.treeMap[fullID]
return ok, nil
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (f *memoryForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
fullID := cid.EncodeToString() + "/" + treeID
t, ok := f.treeMap[fullID]
if !ok {
return ErrTreeNotFound
}
t.syncHeight = height
return nil
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (f *memoryForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
t, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return t.syncHeight, nil
}

View File

@ -1030,3 +1030,52 @@ func testTreeGetTrees(t *testing.T, s Forest) {
require.ElementsMatch(t, treeIDs[cid], trees)
}
}
func TestTreeLastSyncHeight(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testTreeLastSyncHeight(t, providers[i].construct(t))
})
}
}
func testTreeLastSyncHeight(t *testing.T, f Forest) {
cnr := cidtest.ID()
treeID := "someTree"
t.Run("ErrNotFound if no log operations are stored for a tree", func(t *testing.T) {
_, err := f.TreeLastSyncHeight(cnr, treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
err = f.TreeUpdateLastSyncHeight(cnr, treeID, 1)
require.ErrorIs(t, err, ErrTreeNotFound)
})
_, err := f.TreeMove(CIDDescriptor{CID: cnr, Size: 1}, treeID, &Move{
Parent: RootID,
Child: 1,
})
require.NoError(t, err)
h, err := f.TreeLastSyncHeight(cnr, treeID)
require.NoError(t, err)
require.EqualValues(t, 0, h)
t.Run("separate storages for separate containers", func(t *testing.T) {
_, err := f.TreeLastSyncHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
})
require.NoError(t, f.TreeUpdateLastSyncHeight(cnr, treeID, 10))
h, err = f.TreeLastSyncHeight(cnr, treeID)
require.NoError(t, err)
require.EqualValues(t, 10, h)
t.Run("removed correctly", func(t *testing.T) {
require.NoError(t, f.TreeDrop(cnr, treeID))
_, err := f.TreeLastSyncHeight(cnr, treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
})
}

View File

@ -143,8 +143,9 @@ func (s *state) findSpareID() Node {
// tree is a mapping from the child nodes to their parent and metadata.
type tree struct {
infoMap map[Node]nodeInfo
childMap map[Node][]Node
syncHeight uint64
infoMap map[Node]nodeInfo
childMap map[Node][]Node
}
func newTree() *tree {

View File

@ -44,6 +44,10 @@ type Forest interface {
// TreeExists checks if a tree exists locally.
// If the tree is not found, false and a nil error should be returned.
TreeExists(cid cidSDK.ID, treeID string) (bool, error)
// TreeUpdateLastSyncHeight updates last log height synchronized with _all_ container nodes.
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
}
type ForestStorage interface {

View File

@ -111,3 +111,19 @@ func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
}
return s.pilorama.TreeExists(cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (s *Shard) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
if s.pilorama == nil {
return ErrPiloramaDisabled
}
return s.pilorama.TreeUpdateLastSyncHeight(cid, treeID, height)
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (s *Shard) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeLastSyncHeight(cid, treeID)
}

View File

@ -31,10 +31,8 @@ type Service struct {
syncChan chan struct{}
syncPool *ants.Pool
// cnrMap maps contrainer and tree ID to the minimum height which was fetched from _each_ client.
// This allows us to better handle split-brain scenario, because we always synchronize
// from the last seen height. The inner map is read-only and should not be modified in-place.
cnrMap map[cidSDK.ID]map[string]uint64
// cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap
cnrMapMtx sync.Mutex
}
@ -63,7 +61,7 @@ func New(opts ...Option) *Service {
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
s.cnrMap = make(map[cidSDK.ID]struct{})
s.syncChan = make(chan struct{})
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)

View File

@ -86,31 +86,24 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
}
s.cnrMapMtx.Lock()
oldStatus := s.cnrMap[cid]
s.cnrMapMtx.Unlock()
syncStatus := map[string]uint64{}
for i := range treesToSync {
syncStatus[treesToSync[i]] = 0
}
for tid := range oldStatus {
if _, ok := syncStatus[tid]; ok {
syncStatus[tid] = oldStatus[tid]
}
}
for _, tid := range treesToSync {
h := s.synchronizeTree(ctx, d, syncStatus[tid], tid, nodes)
if syncStatus[tid] < h {
syncStatus[tid] = h
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn("could not get last synchronized height for a tree",
zap.Stringer("cid", d.CID),
zap.String("tree", tid))
continue
}
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
s.log.Warn("could not update last synchronized height for a tree",
zap.Stringer("cid", d.CID),
zap.String("tree", tid))
}
}
}
s.cnrMapMtx.Lock()
s.cnrMap[cid] = syncStatus
s.cnrMapMtx.Unlock()
return nil
}