Compare commits
9 commits
master
...
fix/panic-
Author | SHA1 | Date | |
---|---|---|---|
|
c60029d3b0 | ||
|
0beb7ccf5c | ||
0fe5e34fb0 | |||
bcf3f0f517 | |||
79d59e4ed2 | |||
364b4ac572 | |||
f7679a8168 | |||
2dc2fe8780 | |||
21412ef24a |
17 changed files with 278 additions and 28 deletions
|
@ -6,6 +6,9 @@ Changelog for FrostFS Node
|
|||
### Added
|
||||
### Changed
|
||||
### Fixed
|
||||
- Copy number was not used for `PUT` requests (#284)
|
||||
- Tree service panic in its internal client cache (#323)
|
||||
|
||||
### Removed
|
||||
### Updated
|
||||
### Updating from v0.36.0
|
||||
|
|
|
@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
|
|||
e error
|
||||
}
|
||||
|
||||
type locker struct {
|
||||
mtx *sync.Mutex
|
||||
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||
}
|
||||
|
||||
type keyLocker[K comparable] struct {
|
||||
lockers map[K]*locker
|
||||
lockersMtx *sync.Mutex
|
||||
}
|
||||
|
||||
func newKeyLocker[K comparable]() *keyLocker[K] {
|
||||
return &keyLocker[K]{
|
||||
lockers: make(map[K]*locker),
|
||||
lockersMtx: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *keyLocker[K]) LockKey(key K) {
|
||||
l.lockersMtx.Lock()
|
||||
|
||||
if locker, found := l.lockers[key]; found {
|
||||
locker.waiters++
|
||||
l.lockersMtx.Unlock()
|
||||
|
||||
locker.mtx.Lock()
|
||||
return
|
||||
}
|
||||
|
||||
locker := &locker{
|
||||
mtx: &sync.Mutex{},
|
||||
waiters: 1,
|
||||
}
|
||||
locker.mtx.Lock()
|
||||
|
||||
l.lockers[key] = locker
|
||||
l.lockersMtx.Unlock()
|
||||
}
|
||||
|
||||
func (l *keyLocker[K]) UnlockKey(key K) {
|
||||
l.lockersMtx.Lock()
|
||||
defer l.lockersMtx.Unlock()
|
||||
|
||||
locker, found := l.lockers[key]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
if locker.waiters == 1 {
|
||||
delete(l.lockers, key)
|
||||
}
|
||||
locker.waiters--
|
||||
|
||||
locker.mtx.Unlock()
|
||||
}
|
||||
|
||||
// entity that provides TTL cache interface.
|
||||
type ttlNetCache[K comparable, V any] struct {
|
||||
ttl time.Duration
|
||||
|
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
|
|||
cache *lru.Cache[K, *valueWithTime[V]]
|
||||
|
||||
netRdr netValueReader[K, V]
|
||||
|
||||
keyLocker *keyLocker[K]
|
||||
}
|
||||
|
||||
// complicates netValueReader with TTL caching mechanism.
|
||||
|
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
|||
fatalOnErr(err)
|
||||
|
||||
return &ttlNetCache[K, V]{
|
||||
ttl: ttl,
|
||||
sz: sz,
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
ttl: ttl,
|
||||
sz: sz,
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
keyLocker: newKeyLocker[K](),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
|||
// returned value should not be modified.
|
||||
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||
val, ok := c.cache.Peek(key)
|
||||
if ok {
|
||||
if time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
if ok && time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
c.cache.Remove(key)
|
||||
c.keyLocker.LockKey(key)
|
||||
defer c.keyLocker.UnlockKey(key)
|
||||
|
||||
val, ok = c.cache.Peek(key)
|
||||
if ok && time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
v, err := c.netRdr(key)
|
||||
|
||||
c.set(key, v, err)
|
||||
c.cache.Add(key, &valueWithTime[V]{
|
||||
v: v,
|
||||
t: time.Now(),
|
||||
e: err,
|
||||
})
|
||||
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||
c.keyLocker.LockKey(k)
|
||||
defer c.keyLocker.UnlockKey(k)
|
||||
|
||||
c.cache.Add(k, &valueWithTime[V]{
|
||||
v: v,
|
||||
t: time.Now(),
|
||||
|
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
|||
}
|
||||
|
||||
func (c *ttlNetCache[K, V]) remove(key K) {
|
||||
c.keyLocker.LockKey(key)
|
||||
defer c.keyLocker.UnlockKey(key)
|
||||
|
||||
c.cache.Remove(key)
|
||||
}
|
||||
|
||||
|
|
32
cmd/frostfs-node/cache_test.go
Normal file
32
cmd/frostfs-node/cache_test.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestKeyLocker(t *testing.T) {
|
||||
taken := false
|
||||
eg, _ := errgroup.WithContext(context.Background())
|
||||
keyLocker := newKeyLocker[int]()
|
||||
for i := 0; i < 100; i++ {
|
||||
eg.Go(func() error {
|
||||
keyLocker.LockKey(0)
|
||||
defer keyLocker.UnlockKey(0)
|
||||
|
||||
require.False(t, taken)
|
||||
taken = true
|
||||
require.True(t, taken)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
taken = false
|
||||
require.False(t, taken)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
|
@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||
// but don't forget about the profit of reading the new container and caching it:
|
||||
// creation success are most commonly tracked by polling GET op.
|
||||
cnr, err := cachedContainerStorage.Get(ev.ID)
|
||||
cnr, err := cnrSrc.Get(ev.ID)
|
||||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||
cachedContainerStorage.set(ev.ID, cnr, nil)
|
||||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
|
|||
go 1.18
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -36,8 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
|
|||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0 h1:oZ0/KiaFeveXRLi5VVEpuLSHczeFyWx4HDl9wTJUtsE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0/go.mod h1:sPyITTmQT662ZI38ud2aoE1SUCAr1mO5xV8P4nzLkKI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700 h1:2pnUstGQAb+kU78UFg52E1otvL/k2R7pRpMTKYdE19g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700/go.mod h1:sPyITTmQT662ZI38ud2aoE1SUCAr1mO5xV8P4nzLkKI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb h1:S/TrbOOu9qEXZRZ9/Ddw7crnxbBUQLo68PSzQWYrc9M=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb/go.mod h1:nkR5gaGeez3Zv2SE7aceP0YwxG2FzIB5cGKpQO2vV2o=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||
|
|
|
@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
return err == nil, err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
index, lst, err := e.getTreeShard(cid, treeID)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
return lst[index].TreeHeight(cid, treeID)
|
||||
}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||
index, lst, err := e.getTreeShard(cid, treeID)
|
||||
|
|
|
@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
|
|||
})
|
||||
}
|
||||
|
||||
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
var height uint64
|
||||
var retErr error
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(cid, treeID))
|
||||
if treeRoot != nil {
|
||||
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
|
||||
height = binary.BigEndian.Uint64(k)
|
||||
} else {
|
||||
retErr = ErrTreeNotFound
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
err = retErr
|
||||
}
|
||||
return height, err
|
||||
}
|
||||
|
||||
// TreeExists implements the Forest interface.
|
||||
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||
t.modeMtx.RLock()
|
||||
|
|
|
@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
tree, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return 0, ErrTreeNotFound
|
||||
}
|
||||
return tree.operations[len(tree.operations)-1].Time, nil
|
||||
}
|
||||
|
||||
// TreeExists implements the pilorama.Forest interface.
|
||||
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
|
|
|
@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
|||
checkExists(t, false, cid, treeID)
|
||||
})
|
||||
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
|
||||
checkExists(t, true, cid, treeID)
|
||||
|
||||
height, err := s.TreeHeight(cid, treeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 11, height)
|
||||
|
||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
||||
_, err = s.TreeHeight(cidtest.ID(), treeID)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
||||
t.Run("can be removed", func(t *testing.T) {
|
||||
require.NoError(t, s.TreeDrop(cid, treeID))
|
||||
|
|
|
@ -48,6 +48,8 @@ type Forest interface {
|
|||
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
|
||||
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
||||
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||
// TreeHeight returns current tree height.
|
||||
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||
}
|
||||
|
||||
type ForestStorage interface {
|
||||
|
|
|
@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
|
|||
return s.pilorama.TreeList(cid)
|
||||
}
|
||||
|
||||
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
if s.pilorama == nil {
|
||||
return 0, ErrPiloramaDisabled
|
||||
}
|
||||
return s.pilorama.TreeHeight(cid, treeID)
|
||||
}
|
||||
|
||||
// TreeExists implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||
if s.pilorama == nil {
|
||||
|
|
|
@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.relay = f
|
||||
|
|
|
@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
|
|||
object.NewFromV2(oV2),
|
||||
).
|
||||
WithRelay(s.relayRequest).
|
||||
WithCommonPrm(commonPrm), nil
|
||||
WithCommonPrm(commonPrm).
|
||||
WithCopyNumber(part.GetCopiesNumber()), nil
|
||||
}
|
||||
|
||||
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
|
||||
|
|
|
@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
|
|||
|
||||
func (c *clientCache) init() {
|
||||
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||
_ = value.cc.Close()
|
||||
if conn := value.cc; conn != nil {
|
||||
_ = conn.Close()
|
||||
}
|
||||
})
|
||||
c.LRU = *l
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -31,6 +32,8 @@ type Service struct {
|
|||
syncChan chan struct{}
|
||||
syncPool *ants.Pool
|
||||
|
||||
initialSyncDone atomic.Bool
|
||||
|
||||
// cnrMap contains existing (used) container IDs.
|
||||
cnrMap map[cidSDK.ID]struct{}
|
||||
// cnrMapMtx protects cnrMap
|
||||
|
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
|
|||
}
|
||||
|
||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
}
|
||||
|
||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
}
|
||||
|
||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
// Move applies client operation to the specified tree and pushes in queue
|
||||
// for replication on other nodes.
|
||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
}
|
||||
|
||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
}
|
||||
|
||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
}
|
||||
|
||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
h := b.GetHeight()
|
||||
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
|
||||
if err != nil || lm.Time == 0 {
|
||||
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
||||
err := cid.Decode(req.GetBody().GetContainerId())
|
||||
|
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
|
|||
}
|
||||
|
||||
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
return new(HealthcheckResponse), nil
|
||||
}
|
||||
|
|
|
@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
errG, ctx := errgroup.WithContext(ctx)
|
||||
errG.SetLimit(1024)
|
||||
|
||||
var heightMtx sync.Mutex
|
||||
|
||||
for {
|
||||
newHeight := height
|
||||
req := &GetOpLogRequest{
|
||||
|
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
_ = errG.Wait()
|
||||
return newHeight, err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
_ = errG.Wait()
|
||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
|
||||
|
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
_ = errG.Wait()
|
||||
return newHeight, err
|
||||
}
|
||||
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
newHeight = m.Time + 1
|
||||
} else {
|
||||
newHeight++
|
||||
}
|
||||
errG.Go(func() error {
|
||||
err := s.forest.TreeApply(cid, treeID, m, true)
|
||||
heightMtx.Lock()
|
||||
defer heightMtx.Unlock()
|
||||
if err != nil {
|
||||
if newHeight > height {
|
||||
height = newHeight
|
||||
}
|
||||
return err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
newHeight = m.Time + 1
|
||||
} else {
|
||||
newHeight++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
applyErr := errG.Wait()
|
||||
if err == nil {
|
||||
err = applyErr
|
||||
}
|
||||
|
||||
heightMtx.Lock()
|
||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||
heightMtx.Unlock()
|
||||
return newHeight, err
|
||||
}
|
||||
height = newHeight
|
||||
heightMtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
cnrs, err := s.cfg.cnrSource.List()
|
||||
if err != nil {
|
||||
s.log.Error("could not fetch containers", zap.Error(err))
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||
|
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
|
||||
s.log.Debug("trees have been synchronized")
|
||||
}
|
||||
s.initialSyncDone.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue