Compare commits
10 commits
master
...
fix/comple
Author | SHA1 | Date | |
---|---|---|---|
ae4dab4bd8 | |||
|
c60029d3b0 | ||
|
0beb7ccf5c | ||
0fe5e34fb0 | |||
bcf3f0f517 | |||
79d59e4ed2 | |||
364b4ac572 | |||
f7679a8168 | |||
2dc2fe8780 | |||
21412ef24a |
19 changed files with 361 additions and 28 deletions
|
@ -6,6 +6,9 @@ Changelog for FrostFS Node
|
||||||
### Added
|
### Added
|
||||||
### Changed
|
### Changed
|
||||||
### Fixed
|
### Fixed
|
||||||
|
- Copy number was not used for `PUT` requests (#284)
|
||||||
|
- Tree service panic in its internal client cache (#323)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
### Updated
|
### Updated
|
||||||
### Updating from v0.36.0
|
### Updating from v0.36.0
|
||||||
|
|
|
@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
|
||||||
e error
|
e error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type locker struct {
|
||||||
|
mtx *sync.Mutex
|
||||||
|
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||||
|
}
|
||||||
|
|
||||||
|
type keyLocker[K comparable] struct {
|
||||||
|
lockers map[K]*locker
|
||||||
|
lockersMtx *sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newKeyLocker[K comparable]() *keyLocker[K] {
|
||||||
|
return &keyLocker[K]{
|
||||||
|
lockers: make(map[K]*locker),
|
||||||
|
lockersMtx: &sync.Mutex{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *keyLocker[K]) LockKey(key K) {
|
||||||
|
l.lockersMtx.Lock()
|
||||||
|
|
||||||
|
if locker, found := l.lockers[key]; found {
|
||||||
|
locker.waiters++
|
||||||
|
l.lockersMtx.Unlock()
|
||||||
|
|
||||||
|
locker.mtx.Lock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
locker := &locker{
|
||||||
|
mtx: &sync.Mutex{},
|
||||||
|
waiters: 1,
|
||||||
|
}
|
||||||
|
locker.mtx.Lock()
|
||||||
|
|
||||||
|
l.lockers[key] = locker
|
||||||
|
l.lockersMtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *keyLocker[K]) UnlockKey(key K) {
|
||||||
|
l.lockersMtx.Lock()
|
||||||
|
defer l.lockersMtx.Unlock()
|
||||||
|
|
||||||
|
locker, found := l.lockers[key]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if locker.waiters == 1 {
|
||||||
|
delete(l.lockers, key)
|
||||||
|
}
|
||||||
|
locker.waiters--
|
||||||
|
|
||||||
|
locker.mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// entity that provides TTL cache interface.
|
// entity that provides TTL cache interface.
|
||||||
type ttlNetCache[K comparable, V any] struct {
|
type ttlNetCache[K comparable, V any] struct {
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
|
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
|
||||||
cache *lru.Cache[K, *valueWithTime[V]]
|
cache *lru.Cache[K, *valueWithTime[V]]
|
||||||
|
|
||||||
netRdr netValueReader[K, V]
|
netRdr netValueReader[K, V]
|
||||||
|
|
||||||
|
keyLocker *keyLocker[K]
|
||||||
}
|
}
|
||||||
|
|
||||||
// complicates netValueReader with TTL caching mechanism.
|
// complicates netValueReader with TTL caching mechanism.
|
||||||
|
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
return &ttlNetCache[K, V]{
|
return &ttlNetCache[K, V]{
|
||||||
ttl: ttl,
|
ttl: ttl,
|
||||||
sz: sz,
|
sz: sz,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
netRdr: netRdr,
|
netRdr: netRdr,
|
||||||
|
keyLocker: newKeyLocker[K](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
||||||
// returned value should not be modified.
|
// returned value should not be modified.
|
||||||
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||||
val, ok := c.cache.Peek(key)
|
val, ok := c.cache.Peek(key)
|
||||||
if ok {
|
if ok && time.Since(val.t) < c.ttl {
|
||||||
if time.Since(val.t) < c.ttl {
|
return val.v, val.e
|
||||||
return val.v, val.e
|
}
|
||||||
}
|
|
||||||
|
|
||||||
c.cache.Remove(key)
|
c.keyLocker.LockKey(key)
|
||||||
|
defer c.keyLocker.UnlockKey(key)
|
||||||
|
|
||||||
|
val, ok = c.cache.Peek(key)
|
||||||
|
if ok && time.Since(val.t) < c.ttl {
|
||||||
|
return val.v, val.e
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := c.netRdr(key)
|
v, err := c.netRdr(key)
|
||||||
|
|
||||||
c.set(key, v, err)
|
c.cache.Add(key, &valueWithTime[V]{
|
||||||
|
v: v,
|
||||||
|
t: time.Now(),
|
||||||
|
e: err,
|
||||||
|
})
|
||||||
|
|
||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||||
|
c.keyLocker.LockKey(k)
|
||||||
|
defer c.keyLocker.UnlockKey(k)
|
||||||
|
|
||||||
c.cache.Add(k, &valueWithTime[V]{
|
c.cache.Add(k, &valueWithTime[V]{
|
||||||
v: v,
|
v: v,
|
||||||
t: time.Now(),
|
t: time.Now(),
|
||||||
|
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache[K, V]) remove(key K) {
|
func (c *ttlNetCache[K, V]) remove(key K) {
|
||||||
|
c.keyLocker.LockKey(key)
|
||||||
|
defer c.keyLocker.UnlockKey(key)
|
||||||
|
|
||||||
c.cache.Remove(key)
|
c.cache.Remove(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
32
cmd/frostfs-node/cache_test.go
Normal file
32
cmd/frostfs-node/cache_test.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestKeyLocker(t *testing.T) {
|
||||||
|
taken := false
|
||||||
|
eg, _ := errgroup.WithContext(context.Background())
|
||||||
|
keyLocker := newKeyLocker[int]()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
eg.Go(func() error {
|
||||||
|
keyLocker.LockKey(0)
|
||||||
|
defer keyLocker.UnlockKey(0)
|
||||||
|
|
||||||
|
require.False(t, taken)
|
||||||
|
taken = true
|
||||||
|
require.True(t, taken)
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
taken = false
|
||||||
|
require.False(t, taken)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
|
@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||||
// but don't forget about the profit of reading the new container and caching it:
|
// but don't forget about the profit of reading the new container and caching it:
|
||||||
// creation success are most commonly tracked by polling GET op.
|
// creation success are most commonly tracked by polling GET op.
|
||||||
cnr, err := cachedContainerStorage.Get(ev.ID)
|
cnr, err := cnrSrc.Get(ev.ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||||
|
cachedContainerStorage.set(ev.ID, cnr, nil)
|
||||||
} else {
|
} else {
|
||||||
// unlike removal, we expect successful receive of the container
|
// unlike removal, we expect successful receive of the container
|
||||||
// after successful creation, so logging can be useful
|
// after successful creation, so logging can be useful
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
|
||||||
go 1.18
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.0
|
git.frostfs.info/TrueCloudLab/hrw v1.2.0
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
return err == nil, err
|
return err == nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
index, lst, err := e.getTreeShard(cid, treeID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return lst[index].TreeHeight(cid, treeID)
|
||||||
|
}
|
||||||
|
|
||||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||||
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||||
index, lst, err := e.getTreeShard(cid, treeID)
|
index, lst, err := e.getTreeShard(cid, treeID)
|
||||||
|
|
58
pkg/local_object_storage/metabase/children.go
Normal file
58
pkg/local_object_storage/metabase/children.go
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetChildren returns parent -> children map.
|
||||||
|
// If an object has no children, then map will contain addr -> nil value.
|
||||||
|
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(map[oid.Address][]oid.Address, len(addresses))
|
||||||
|
|
||||||
|
buffer := make([]byte, bucketKeySize)
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
for _, addr := range addresses {
|
||||||
|
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer[:]))
|
||||||
|
if bkt == nil {
|
||||||
|
result[addr] = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer[:])))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(binObjIDs) == 0 {
|
||||||
|
result[addr] = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, binObjID := range binObjIDs {
|
||||||
|
var id oid.ID
|
||||||
|
if err = id.Decode(binObjID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var resultAddress oid.Address
|
||||||
|
resultAddress.SetContainer(addr.Container())
|
||||||
|
resultAddress.SetObject(id)
|
||||||
|
result[addr] = append(result[addr], resultAddress)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
t.modeMtx.RLock()
|
||||||
|
defer t.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if t.mode.NoMetabase() {
|
||||||
|
return 0, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var height uint64
|
||||||
|
var retErr error
|
||||||
|
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
treeRoot := tx.Bucket(bucketName(cid, treeID))
|
||||||
|
if treeRoot != nil {
|
||||||
|
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
|
||||||
|
height = binary.BigEndian.Uint64(k)
|
||||||
|
} else {
|
||||||
|
retErr = ErrTreeNotFound
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
err = retErr
|
||||||
|
}
|
||||||
|
return height, err
|
||||||
|
}
|
||||||
|
|
||||||
// TreeExists implements the Forest interface.
|
// TreeExists implements the Forest interface.
|
||||||
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
t.modeMtx.RLock()
|
t.modeMtx.RLock()
|
||||||
|
|
|
@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
|
||||||
|
fullID := cid.EncodeToString() + "/" + treeID
|
||||||
|
tree, ok := f.treeMap[fullID]
|
||||||
|
if !ok {
|
||||||
|
return 0, ErrTreeNotFound
|
||||||
|
}
|
||||||
|
return tree.operations[len(tree.operations)-1].Time, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TreeExists implements the pilorama.Forest interface.
|
// TreeExists implements the pilorama.Forest interface.
|
||||||
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
|
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
|
||||||
fullID := cid.EncodeToString() + "/" + treeID
|
fullID := cid.EncodeToString() + "/" + treeID
|
||||||
|
|
|
@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
||||||
checkExists(t, false, cid, treeID)
|
checkExists(t, false, cid, treeID)
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
|
||||||
checkExists(t, true, cid, treeID)
|
checkExists(t, true, cid, treeID)
|
||||||
|
|
||||||
|
height, err := s.TreeHeight(cid, treeID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 11, height)
|
||||||
|
|
||||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
|
||||||
|
_, err = s.TreeHeight(cidtest.ID(), treeID)
|
||||||
|
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||||
|
|
||||||
|
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||||
|
|
||||||
t.Run("can be removed", func(t *testing.T) {
|
t.Run("can be removed", func(t *testing.T) {
|
||||||
require.NoError(t, s.TreeDrop(cid, treeID))
|
require.NoError(t, s.TreeDrop(cid, treeID))
|
||||||
|
|
|
@ -48,6 +48,8 @@ type Forest interface {
|
||||||
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
|
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
|
||||||
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
||||||
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||||
|
// TreeHeight returns current tree height.
|
||||||
|
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ForestStorage interface {
|
type ForestStorage interface {
|
||||||
|
|
|
@ -258,6 +258,9 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
|
s.log.Debug("staring expired objects collecting")
|
||||||
|
defer s.log.Debug("expired objects collecting completed")
|
||||||
|
|
||||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
@ -313,6 +316,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expired, err := s.getPhysicalAddresses(expired)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn("failure to get physical expired objects", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
|
|
||||||
inhumePrm.SetAddresses(expired...)
|
inhumePrm.SetAddresses(expired...)
|
||||||
|
@ -338,11 +347,26 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) getPhysicalAddresses(source []oid.Address) ([]oid.Address, error) {
|
||||||
|
result := make([]oid.Address, 0, len(source))
|
||||||
|
parentToChildren, err := s.metaBase.GetChildren(source)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for parent, children := range parentToChildren {
|
||||||
|
result = append(result, parent)
|
||||||
|
result = append(result, children...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
epoch := e.(newEpoch).epoch
|
epoch := e.(newEpoch).epoch
|
||||||
log := s.log.With(zap.Uint64("epoch", epoch))
|
log := s.log.With(zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
log.Debug("started expired tombstones handling")
|
log.Debug("started expired tombstones handling")
|
||||||
|
defer log.Debug("finished expired tombstones handling")
|
||||||
|
|
||||||
const tssDeleteBatch = 50
|
const tssDeleteBatch = 50
|
||||||
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||||
|
@ -399,11 +423,12 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
tss = tss[:0]
|
tss = tss[:0]
|
||||||
tssExp = tssExp[:0]
|
tssExp = tssExp[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("finished expired tombstones handling")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
|
s.log.Debug("staring expired locks collecting")
|
||||||
|
defer s.log.Debug("expired locks collecting completed")
|
||||||
|
|
||||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
|
|
@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
|
||||||
return s.pilorama.TreeList(cid)
|
return s.pilorama.TreeList(cid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||||
|
if s.pilorama == nil {
|
||||||
|
return 0, ErrPiloramaDisabled
|
||||||
|
}
|
||||||
|
return s.pilorama.TreeHeight(cid, treeID)
|
||||||
|
}
|
||||||
|
|
||||||
// TreeExists implements the pilorama.Forest interface.
|
// TreeExists implements the pilorama.Forest interface.
|
||||||
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
|
|
|
@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.relay = f
|
p.relay = f
|
||||||
|
|
|
@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
|
||||||
object.NewFromV2(oV2),
|
object.NewFromV2(oV2),
|
||||||
).
|
).
|
||||||
WithRelay(s.relayRequest).
|
WithRelay(s.relayRequest).
|
||||||
WithCommonPrm(commonPrm), nil
|
WithCommonPrm(commonPrm).
|
||||||
|
WithCopyNumber(part.GetCopiesNumber()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
|
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
|
||||||
|
|
|
@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
|
||||||
|
|
||||||
func (c *clientCache) init() {
|
func (c *clientCache) init() {
|
||||||
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
|
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||||
_ = value.cc.Close()
|
if conn := value.cc; conn != nil {
|
||||||
|
_ = conn.Close()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
c.LRU = *l
|
c.LRU = *l
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -31,6 +32,8 @@ type Service struct {
|
||||||
syncChan chan struct{}
|
syncChan chan struct{}
|
||||||
syncPool *ants.Pool
|
syncPool *ants.Pool
|
||||||
|
|
||||||
|
initialSyncDone atomic.Bool
|
||||||
|
|
||||||
// cnrMap contains existing (used) container IDs.
|
// cnrMap contains existing (used) container IDs.
|
||||||
cnrMap map[cidSDK.ID]struct{}
|
cnrMap map[cidSDK.ID]struct{}
|
||||||
// cnrMapMtx protects cnrMap
|
// cnrMapMtx protects cnrMap
|
||||||
|
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
// Move applies client operation to the specified tree and pushes in queue
|
// Move applies client operation to the specified tree and pushes in queue
|
||||||
// for replication on other nodes.
|
// for replication on other nodes.
|
||||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
||||||
}
|
}
|
||||||
|
|
||||||
h := b.GetHeight()
|
h := b.GetHeight()
|
||||||
|
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
|
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
|
||||||
if err != nil || lm.Time == 0 {
|
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
|
||||||
err := cid.Decode(req.GetBody().GetContainerId())
|
err := cid.Decode(req.GetBody().GetContainerId())
|
||||||
|
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
return new(HealthcheckResponse), nil
|
return new(HealthcheckResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
cid.Encode(rawCID)
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
|
errG, ctx := errgroup.WithContext(ctx)
|
||||||
|
errG.SetLimit(1024)
|
||||||
|
|
||||||
|
var heightMtx sync.Mutex
|
||||||
|
|
||||||
for {
|
for {
|
||||||
newHeight := height
|
newHeight := height
|
||||||
req := &GetOpLogRequest{
|
req := &GetOpLogRequest{
|
||||||
|
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
c, err := treeClient.GetOpLog(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
_ = errG.Wait()
|
||||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
Child: lm.ChildId,
|
Child: lm.ChildId,
|
||||||
}
|
}
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
errG.Go(func() error {
|
||||||
return newHeight, err
|
err := s.forest.TreeApply(cid, treeID, m, true)
|
||||||
}
|
heightMtx.Lock()
|
||||||
if m.Time > newHeight {
|
defer heightMtx.Unlock()
|
||||||
newHeight = m.Time + 1
|
if err != nil {
|
||||||
} else {
|
if newHeight > height {
|
||||||
newHeight++
|
height = newHeight
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if m.Time > newHeight {
|
||||||
|
newHeight = m.Time + 1
|
||||||
|
} else {
|
||||||
|
newHeight++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
applyErr := errG.Wait()
|
||||||
|
if err == nil {
|
||||||
|
err = applyErr
|
||||||
|
}
|
||||||
|
|
||||||
|
heightMtx.Lock()
|
||||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
heightMtx.Unlock()
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
height = newHeight
|
height = newHeight
|
||||||
|
heightMtx.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
cnrs, err := s.cfg.cnrSource.List()
|
cnrs, err := s.cfg.cnrSource.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("could not fetch containers", zap.Error(err))
|
s.log.Error("could not fetch containers", zap.Error(err))
|
||||||
continue
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
newMap, cnrsToSync := s.containersToSync(cnrs)
|
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||||
|
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
|
|
||||||
s.log.Debug("trees have been synchronized")
|
s.log.Debug("trees have been synchronized")
|
||||||
}
|
}
|
||||||
|
s.initialSyncDone.Store(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue