Compare commits

...

9 commits

Author SHA1 Message Date
Pavel Karpy
c60029d3b0 [#323] node: Fix tree svc panic
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
If a connection has not been established earlier, it stores `nil` in LRU
cache. Cache eviction tries to close every connection (even a `nil` one) and
panics but not crash the app because we are using pools.
That ugly bug also leads to a deadlock where `Unlock` is not called via
`defer` func (and that is the way I found it).

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-04 20:04:30 +03:00
Pavel Karpy
0beb7ccf5c [#284] node: Use copy_number on server side
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-04-26 10:57:34 +03:00
0fe5e34fb0 [#231] node: Fix race condition in TTL cache
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Use key locker to lock by key.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-25 16:50:27 +03:00
bcf3f0f517 [#231] node: Invalidate container cache on PutSuccess event
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
For example: frostfs-cli creates container and makes polling
GetContainer requests. These requests go through container cache,
so not found error stores in container cache.
So container cache can contain not found error when PutSuccess event received.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-20 17:34:00 +03:00
79d59e4ed2 [#266] services/tree: Do not accept requests until initial sync is finished
Some checks failed
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/push/pre-commit Pipeline failed
`Apply` is deliberately left out -- we don't want to miss anything new.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
364b4ac572 [#266] services/tree: Batch operations on the service level
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
f7679a8168 [#266] services/tree: Return operation log up to some height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
2dc2fe8780 [#266] pilorama: Allow to get current tree height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
21412ef24a [#263] node: Up api-go version
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Fix panic in tracing.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-18 11:45:42 +03:00
17 changed files with 278 additions and 28 deletions

View file

@ -6,6 +6,9 @@ Changelog for FrostFS Node
### Added
### Changed
### Fixed
- Copy number was not used for `PUT` requests (#284)
- Tree service panic in its internal client cache (#323)
### Removed
### Updated
### Updating from v0.36.0

View file

@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
e error
}
type locker struct {
mtx *sync.Mutex
waiters int // not protected by mtx, must used outer mutex to update concurrently
}
type keyLocker[K comparable] struct {
lockers map[K]*locker
lockersMtx *sync.Mutex
}
func newKeyLocker[K comparable]() *keyLocker[K] {
return &keyLocker[K]{
lockers: make(map[K]*locker),
lockersMtx: &sync.Mutex{},
}
}
func (l *keyLocker[K]) LockKey(key K) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
l.lockersMtx.Unlock()
locker.mtx.Lock()
return
}
locker := &locker{
mtx: &sync.Mutex{},
waiters: 1,
}
locker.mtx.Lock()
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *keyLocker[K]) UnlockKey(key K) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
locker, found := l.lockers[key]
if !found {
return
}
if locker.waiters == 1 {
delete(l.lockers, key)
}
locker.waiters--
locker.mtx.Unlock()
}
// entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct {
ttl time.Duration
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V]
keyLocker *keyLocker[K]
}
// complicates netValueReader with TTL caching mechanism.
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err)
return &ttlNetCache[K, V]{
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
keyLocker: newKeyLocker[K](),
}
}
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
// returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key)
if ok {
if time.Since(val.t) < c.ttl {
return val.v, val.e
}
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
c.cache.Remove(key)
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
val, ok = c.cache.Peek(key)
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
v, err := c.netRdr(key)
c.set(key, v, err)
c.cache.Add(key, &valueWithTime[V]{
v: v,
t: time.Now(),
e: err,
})
return v, err
}
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.LockKey(k)
defer c.keyLocker.UnlockKey(k)
c.cache.Add(k, &valueWithTime[V]{
v: v,
t: time.Now(),
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
}
func (c *ttlNetCache[K, V]) remove(key K) {
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
c.cache.Remove(key)
}

View file

@ -0,0 +1,32 @@
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := newKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.LockKey(0)
defer keyLocker.UnlockKey(0)
require.False(t, taken)
taken = true
require.True(t, taken)
time.Sleep(10 * time.Millisecond)
taken = false
require.False(t, taken)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op.
cnr, err := cachedContainerStorage.Get(ev.ID)
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful

2
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.18
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
git.frostfs.info/TrueCloudLab/hrw v1.2.0

4
go.sum
View file

@ -36,8 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0 h1:oZ0/KiaFeveXRLi5VVEpuLSHczeFyWx4HDl9wTJUtsE=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0/go.mod h1:sPyITTmQT662ZI38ud2aoE1SUCAr1mO5xV8P4nzLkKI=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700 h1:2pnUstGQAb+kU78UFg52E1otvL/k2R7pRpMTKYdE19g=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418075311-1d691fed5700/go.mod h1:sPyITTmQT662ZI38ud2aoE1SUCAr1mO5xV8P4nzLkKI=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb h1:S/TrbOOu9qEXZRZ9/Ddw7crnxbBUQLo68PSzQWYrc9M=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb/go.mod h1:nkR5gaGeez3Zv2SE7aceP0YwxG2FzIB5cGKpQO2vV2o=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=

View file

@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err
}
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID)

View file

@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
})
}
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, err
}
// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock()

View file

@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
return res, nil
}
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
tree, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return tree.operations[len(tree.operations)-1].Time, nil
}
// TreeExists implements the pilorama.Forest interface.
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
fullID := cid.EncodeToString() + "/" + treeID

View file

@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
checkExists(t, false, cid, treeID)
})
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID)
height, err := s.TreeHeight(cid, treeID)
require.NoError(t, err)
require.EqualValues(t, 11, height)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
_, err = s.TreeHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
checkExists(t, false, cid, "another tree") // same CID, different tree
t.Run("can be removed", func(t *testing.T) {
require.NoError(t, s.TreeDrop(cid, treeID))

View file

@ -48,6 +48,8 @@ type Forest interface {
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height.
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
}
type ForestStorage interface {

View file

@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
return s.pilorama.TreeList(cid)
}
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(cid, treeID)
}
// TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
if s.pilorama == nil {

View file

@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p
}
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
if p != nil {
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
}
return p
}
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil {
p.relay = f

View file

@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2),
).
WithRelay(s.relayRequest).
WithCommonPrm(commonPrm), nil
WithCommonPrm(commonPrm).
WithCopyNumber(part.GetCopiesNumber()), nil
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() {
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
_ = value.cc.Close()
if conn := value.cc; conn != nil {
_ = conn.Close()
}
})
c.LRU = *l
}

View file

@ -13,6 +13,7 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -31,6 +32,8 @@ type Service struct {
syncChan chan struct{}
syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
}
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
}
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
}
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
}
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
}
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
h := b.GetHeight()
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
if err != nil {
return err
}
for {
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
if err != nil || lm.Time == 0 {
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
return err
}
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId())
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
}
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil
}

View file

@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(1024)
var heightMtx sync.Mutex
for {
newHeight := height
req := &GetOpLogRequest{
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
},
}
if err := SignMessage(req, s.key); err != nil {
_ = errG.Wait()
return newHeight, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
_ = errG.Wait()
return newHeight, fmt.Errorf("can't initialize client: %w", err)
}
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errG.Wait()
return newHeight, err
}
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
errG.Go(func() error {
err := s.forest.TreeApply(cid, treeID, m, true)
heightMtx.Lock()
defer heightMtx.Unlock()
if err != nil {
if newHeight > height {
height = newHeight
}
return err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
return nil
})
}
applyErr := errG.Wait()
if err == nil {
err = applyErr
}
heightMtx.Lock()
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
heightMtx.Unlock()
return newHeight, err
}
height = newHeight
heightMtx.Unlock()
}
}
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error("could not fetch containers", zap.Error(err))
continue
break
}
newMap, cnrsToSync := s.containersToSync(cnrs)
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.log.Debug("trees have been synchronized")
}
s.initialSyncDone.Store(true)
}
}