Compare commits

...

10 commits

Author SHA1 Message Date
ae4dab4bd8 [#9999] gc: Complex delete fix
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
Expired complex objects will be deleted by GC.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-15 09:35:55 +03:00
Pavel Karpy
c60029d3b0 [#323] node: Fix tree svc panic
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
If a connection has not been established earlier, it stores `nil` in LRU
cache. Cache eviction tries to close every connection (even a `nil` one) and
panics but not crash the app because we are using pools.
That ugly bug also leads to a deadlock where `Unlock` is not called via
`defer` func (and that is the way I found it).

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-04 20:04:30 +03:00
Pavel Karpy
0beb7ccf5c [#284] node: Use copy_number on server side
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-04-26 10:57:34 +03:00
0fe5e34fb0 [#231] node: Fix race condition in TTL cache
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Use key locker to lock by key.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-25 16:50:27 +03:00
bcf3f0f517 [#231] node: Invalidate container cache on PutSuccess event
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
For example: frostfs-cli creates container and makes polling
GetContainer requests. These requests go through container cache,
so not found error stores in container cache.
So container cache can contain not found error when PutSuccess event received.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-20 17:34:00 +03:00
79d59e4ed2 [#266] services/tree: Do not accept requests until initial sync is finished
Some checks failed
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/push/pre-commit Pipeline failed
`Apply` is deliberately left out -- we don't want to miss anything new.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
364b4ac572 [#266] services/tree: Batch operations on the service level
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
f7679a8168 [#266] services/tree: Return operation log up to some height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
2dc2fe8780 [#266] pilorama: Allow to get current tree height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
21412ef24a [#263] node: Up api-go version
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Fix panic in tracing.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-18 11:45:42 +03:00
19 changed files with 361 additions and 28 deletions

View file

@ -6,6 +6,9 @@ Changelog for FrostFS Node
### Added ### Added
### Changed ### Changed
### Fixed ### Fixed
- Copy number was not used for `PUT` requests (#284)
- Tree service panic in its internal client cache (#323)
### Removed ### Removed
### Updated ### Updated
### Updating from v0.36.0 ### Updating from v0.36.0

View file

@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
e error e error
} }
type locker struct {
mtx *sync.Mutex
waiters int // not protected by mtx, must used outer mutex to update concurrently
}
type keyLocker[K comparable] struct {
lockers map[K]*locker
lockersMtx *sync.Mutex
}
func newKeyLocker[K comparable]() *keyLocker[K] {
return &keyLocker[K]{
lockers: make(map[K]*locker),
lockersMtx: &sync.Mutex{},
}
}
func (l *keyLocker[K]) LockKey(key K) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
l.lockersMtx.Unlock()
locker.mtx.Lock()
return
}
locker := &locker{
mtx: &sync.Mutex{},
waiters: 1,
}
locker.mtx.Lock()
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *keyLocker[K]) UnlockKey(key K) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
locker, found := l.lockers[key]
if !found {
return
}
if locker.waiters == 1 {
delete(l.lockers, key)
}
locker.waiters--
locker.mtx.Unlock()
}
// entity that provides TTL cache interface. // entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct { type ttlNetCache[K comparable, V any] struct {
ttl time.Duration ttl time.Duration
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
cache *lru.Cache[K, *valueWithTime[V]] cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V] netRdr netValueReader[K, V]
keyLocker *keyLocker[K]
} }
// complicates netValueReader with TTL caching mechanism. // complicates netValueReader with TTL caching mechanism.
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err) fatalOnErr(err)
return &ttlNetCache[K, V]{ return &ttlNetCache[K, V]{
ttl: ttl, ttl: ttl,
sz: sz, sz: sz,
cache: cache, cache: cache,
netRdr: netRdr, netRdr: netRdr,
keyLocker: newKeyLocker[K](),
} }
} }
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
// returned value should not be modified. // returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) { func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key) val, ok := c.cache.Peek(key)
if ok { if ok && time.Since(val.t) < c.ttl {
if time.Since(val.t) < c.ttl { return val.v, val.e
return val.v, val.e }
}
c.cache.Remove(key) c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
val, ok = c.cache.Peek(key)
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
} }
v, err := c.netRdr(key) v, err := c.netRdr(key)
c.set(key, v, err) c.cache.Add(key, &valueWithTime[V]{
v: v,
t: time.Now(),
e: err,
})
return v, err return v, err
} }
func (c *ttlNetCache[K, V]) set(k K, v V, e error) { func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.LockKey(k)
defer c.keyLocker.UnlockKey(k)
c.cache.Add(k, &valueWithTime[V]{ c.cache.Add(k, &valueWithTime[V]{
v: v, v: v,
t: time.Now(), t: time.Now(),
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
} }
func (c *ttlNetCache[K, V]) remove(key K) { func (c *ttlNetCache[K, V]) remove(key K) {
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
c.cache.Remove(key) c.cache.Remove(key)
} }

View file

@ -0,0 +1,32 @@
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := newKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.LockKey(0)
defer keyLocker.UnlockKey(0)
require.False(t, taken)
taken = true
require.True(t, taken)
time.Sleep(10 * time.Millisecond)
taken = false
require.False(t, taken)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
// TODO: use owner directly from the event after neofs-contract#256 will become resolved // TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it: // but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op. // creation success are most commonly tracked by polling GET op.
cnr, err := cachedContainerStorage.Get(ev.ID) cnr, err := cnrSrc.Get(ev.ID)
if err == nil { if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true) cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
} else { } else {
// unlike removal, we expect successful receive of the container // unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful // after successful creation, so logging can be useful

2
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.18 go 1.18
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
git.frostfs.info/TrueCloudLab/hrw v1.2.0 git.frostfs.info/TrueCloudLab/hrw v1.2.0

BIN
go.sum

Binary file not shown.

View file

@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err return err == nil, err
} }
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface. // TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error { func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID) index, lst, err := e.getTreeShard(cid, treeID)

View file

@ -0,0 +1,58 @@
package meta
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// GetChildren returns parent -> children map.
// If an object has no children, then map will contain addr -> nil value.
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make(map[oid.Address][]oid.Address, len(addresses))
buffer := make([]byte, bucketKeySize)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, addr := range addresses {
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer[:]))
if bkt == nil {
result[addr] = nil
continue
}
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer[:])))
if err != nil {
return err
}
if len(binObjIDs) == 0 {
result[addr] = nil
continue
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return err
}
var resultAddress oid.Address
resultAddress.SetContainer(addr.Container())
resultAddress.SetObject(id)
result[addr] = append(result[addr], resultAddress)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
}) })
} }
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, err
}
// TreeExists implements the Forest interface. // TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock() t.modeMtx.RLock()

View file

@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
return res, nil return res, nil
} }
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
tree, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return tree.operations[len(tree.operations)-1].Time, nil
}
// TreeExists implements the pilorama.Forest interface. // TreeExists implements the pilorama.Forest interface.
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) { func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
fullID := cid.EncodeToString() + "/" + treeID fullID := cid.EncodeToString() + "/" + treeID

View file

@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
checkExists(t, false, cid, treeID) checkExists(t, false, cid, treeID)
}) })
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false)) require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID) checkExists(t, true, cid, treeID)
height, err := s.TreeHeight(cid, treeID)
require.NoError(t, err)
require.EqualValues(t, 11, height)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
_, err = s.TreeHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
checkExists(t, false, cid, "another tree") // same CID, different tree
t.Run("can be removed", func(t *testing.T) { t.Run("can be removed", func(t *testing.T) {
require.NoError(t, s.TreeDrop(cid, treeID)) require.NoError(t, s.TreeDrop(cid, treeID))

View file

@ -48,6 +48,8 @@ type Forest interface {
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes. // TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height.
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
} }
type ForestStorage interface { type ForestStorage interface {

View file

@ -258,6 +258,9 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
} }
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
s.log.Debug("staring expired objects collecting")
defer s.log.Debug("expired objects collecting completed")
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx) errGroup, egCtx := errgroup.WithContext(ctx)
@ -313,6 +316,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return return
} }
expired, err := s.getPhysicalAddresses(expired)
if err != nil {
s.log.Warn("failure to get physical expired objects", zap.Error(err))
return
}
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...) inhumePrm.SetAddresses(expired...)
@ -338,11 +347,26 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
} }
} }
func (s *Shard) getPhysicalAddresses(source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(source)
if err != nil {
return nil, err
}
for parent, children := range parentToChildren {
result = append(result, parent)
result = append(result, children...)
}
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))
log.Debug("started expired tombstones handling") log.Debug("started expired tombstones handling")
defer log.Debug("finished expired tombstones handling")
const tssDeleteBatch = 50 const tssDeleteBatch = 50
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
@ -399,11 +423,12 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
tss = tss[:0] tss = tss[:0]
tssExp = tssExp[:0] tssExp = tssExp[:0]
} }
log.Debug("finished expired tombstones handling")
} }
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
s.log.Debug("staring expired locks collecting")
defer s.log.Debug("expired locks collecting completed")
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx) errGroup, egCtx := errgroup.WithContext(ctx)

View file

@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
return s.pilorama.TreeList(cid) return s.pilorama.TreeList(cid)
} }
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(cid, treeID)
}
// TreeExists implements the pilorama.Forest interface. // TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
if s.pilorama == nil { if s.pilorama == nil {

View file

@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p return p
} }
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
if p != nil {
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
}
return p
}
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil { if p != nil {
p.relay = f p.relay = f

View file

@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2), object.NewFromV2(oV2),
). ).
WithRelay(s.relayRequest). WithRelay(s.relayRequest).
WithCommonPrm(commonPrm), nil WithCommonPrm(commonPrm).
WithCopyNumber(part.GetCopiesNumber()), nil
} }
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm { func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() { func (c *clientCache) init() {
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) { l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
_ = value.cc.Close() if conn := value.cc; conn != nil {
_ = conn.Close()
}
}) })
c.LRU = *l c.LRU = *l
} }

View file

@ -13,6 +13,7 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -31,6 +32,8 @@ type Service struct {
syncChan chan struct{} syncChan chan struct{}
syncPool *ants.Pool syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs. // cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{} cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap // cnrMapMtx protects cnrMap
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
} }
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
} }
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue // Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes. // for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
} }
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
} }
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
h := b.GetHeight() h := b.GetHeight()
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
if err != nil {
return err
}
for { for {
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h) lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
if err != nil || lm.Time == 0 { if err != nil || lm.Time == 0 || lastHeight < lm.Time {
return err return err
} }
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId()) err := cid.Decode(req.GetBody().GetContainerId())
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
} }
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil return new(HealthcheckResponse), nil
} }

View file

@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
rawCID := make([]byte, sha256.Size) rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID) cid.Encode(rawCID)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(1024)
var heightMtx sync.Mutex
for { for {
newHeight := height newHeight := height
req := &GetOpLogRequest{ req := &GetOpLogRequest{
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
}, },
} }
if err := SignMessage(req, s.key); err != nil { if err := SignMessage(req, s.key); err != nil {
_ = errG.Wait()
return newHeight, err return newHeight, err
} }
c, err := treeClient.GetOpLog(ctx, req) c, err := treeClient.GetOpLog(ctx, req)
if err != nil { if err != nil {
_ = errG.Wait()
return newHeight, fmt.Errorf("can't initialize client: %w", err) return newHeight, fmt.Errorf("can't initialize client: %w", err)
} }
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
Child: lm.ChildId, Child: lm.ChildId,
} }
if err := m.Meta.FromBytes(lm.Meta); err != nil { if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errG.Wait()
return newHeight, err return newHeight, err
} }
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil { errG.Go(func() error {
return newHeight, err err := s.forest.TreeApply(cid, treeID, m, true)
} heightMtx.Lock()
if m.Time > newHeight { defer heightMtx.Unlock()
newHeight = m.Time + 1 if err != nil {
} else { if newHeight > height {
newHeight++ height = newHeight
} }
return err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
return nil
})
} }
applyErr := errG.Wait()
if err == nil {
err = applyErr
}
heightMtx.Lock()
if height == newHeight || err != nil && !errors.Is(err, io.EOF) { if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
heightMtx.Unlock()
return newHeight, err return newHeight, err
} }
height = newHeight height = newHeight
heightMtx.Unlock()
} }
} }
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
cnrs, err := s.cfg.cnrSource.List() cnrs, err := s.cfg.cnrSource.List()
if err != nil { if err != nil {
s.log.Error("could not fetch containers", zap.Error(err)) s.log.Error("could not fetch containers", zap.Error(err))
continue break
} }
newMap, cnrsToSync := s.containersToSync(cnrs) newMap, cnrsToSync := s.containersToSync(cnrs)
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.log.Debug("trees have been synchronized") s.log.Debug("trees have been synchronized")
} }
s.initialSyncDone.Store(true)
} }
} }