Compare commits

...

22 commits

Author SHA1 Message Date
2360cf263b [#392] shard: Create tombstone source when reload
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 15:52:59 +03:00
f866ec1399 [#392] gc: Use defer to mark handler done
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 15:52:49 +03:00
a506da97d6 [#384] shard: Add unit test
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Add test to check that oject not found error will be returned,
if object doesn't exist in blobstore.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 09:55:16 +03:00
1dd84eef77 [#384] shard: Cancel GC if change mode requested
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 09:55:16 +03:00
1501f11e4d [#351] cli: Support copies number parameter in object put
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-05-18 15:52:51 +00:00
4f55417914 [#351] Fix end of files
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-05-18 15:52:51 +00:00
9bda6e0b8b [#332] gc: Fix expired complex object deletion
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-18 12:51:34 +00:00
ceb9deb7f1 [#337] morph: Move subscription logic to subscriber
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 11:28:50 +03:00
4148590668 [#365] go.mod: Update api-go
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 09:51:07 +03:00
493cafc62a [#355] Increase tree svc client cache size to test hypotheses
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-17 15:17:35 +03:00
3711976dfc [#314] writecache: remove objects right after they are flushed
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-16 14:16:25 +03:00
Pavel Karpy
c3f5045842 [#314] wc: Do not lose small objects on disk errors
Do return error if an object could not been stored on WC's disk.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-15 16:27:44 +03:00
Pavel Karpy
ab65063d6d [#314] wc: Simplify background workers naming
Also, drop not used arg.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-15 16:27:42 +03:00
Pavel Karpy
c60029d3b0 [#323] node: Fix tree svc panic
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
If a connection has not been established earlier, it stores `nil` in LRU
cache. Cache eviction tries to close every connection (even a `nil` one) and
panics but not crash the app because we are using pools.
That ugly bug also leads to a deadlock where `Unlock` is not called via
`defer` func (and that is the way I found it).

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-04 20:04:30 +03:00
Pavel Karpy
0beb7ccf5c [#284] node: Use copy_number on server side
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-04-26 10:57:34 +03:00
0fe5e34fb0 [#231] node: Fix race condition in TTL cache
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Use key locker to lock by key.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-25 16:50:27 +03:00
bcf3f0f517 [#231] node: Invalidate container cache on PutSuccess event
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
For example: frostfs-cli creates container and makes polling
GetContainer requests. These requests go through container cache,
so not found error stores in container cache.
So container cache can contain not found error when PutSuccess event received.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-20 17:34:00 +03:00
79d59e4ed2 [#266] services/tree: Do not accept requests until initial sync is finished
Some checks failed
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/push/pre-commit Pipeline failed
`Apply` is deliberately left out -- we don't want to miss anything new.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
364b4ac572 [#266] services/tree: Batch operations on the service level
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
f7679a8168 [#266] services/tree: Return operation log up to some height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
2dc2fe8780 [#266] pilorama: Allow to get current tree height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
21412ef24a [#263] node: Up api-go version
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Fix panic in tracing.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-18 11:45:42 +03:00
53 changed files with 1062 additions and 1087 deletions

View file

@ -4,8 +4,13 @@ Changelog for FrostFS Node
## [Unreleased]
### Added
- Support copies number parameter in `frostfs-cli object put` (#351)
### Changed
### Fixed
- Copy number was not used for `PUT` requests (#284)
- Tree service panic in its internal client cache (#323)
### Removed
### Updated
### Updating from v0.36.0
@ -64,6 +69,7 @@ Changelog for FrostFS Node
- Iterating over just removed files by FSTree (#98)
- Parts of a locked object could not be removed anymore (#141)
- Non-alphabet nodes do not try to handle alphabet events (#181)
- Delete complex objects with GC (#332)
### Removed
### Updated

View file

@ -329,6 +329,8 @@ func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) {
type PutObjectPrm struct {
commonObjectPrm
copyNum uint32
hdr *object.Object
rdr io.Reader
@ -352,6 +354,11 @@ func (x *PutObjectPrm) SetHeaderCallback(f func(*object.Object)) {
x.headerCallback = f
}
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
func (x *PutObjectPrm) SetCopiesNumber(copiesNumbers uint32) {
x.copyNum = copiesNumbers
}
// PutObjectRes groups the resulting values of PutObject operation.
type PutObjectRes struct {
id oid.ID
@ -381,6 +388,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
}
putPrm.WithXHeaders(prm.xHeaders...)
putPrm.SetCopiesNumber(prm.copyNum)
wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm)
if err != nil {

View file

@ -25,6 +25,7 @@ import (
const (
noProgressFlag = "no-progress"
notificationFlag = "notify"
copiesNumberFlag = "copies-number"
)
var putExpiredOn uint64
@ -56,6 +57,8 @@ func initObjectPutCmd() {
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
flags.Uint32(copiesNumberFlag, 0, "Number of copies of the object to store within the RPC call")
}
func putObject(cmd *cobra.Command, _ []string) {
@ -116,6 +119,12 @@ func putObject(cmd *cobra.Command, _ []string) {
}
}
copyNum, err := cmd.Flags().GetUint32(copiesNumberFlag)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
if copyNum > 0 {
prm.SetCopiesNumber(copyNum)
}
res, err := internalclient.PutObject(prm)
if p != nil {
p.Finish()

View file

@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
e error
}
type locker struct {
mtx *sync.Mutex
waiters int // not protected by mtx, must used outer mutex to update concurrently
}
type keyLocker[K comparable] struct {
lockers map[K]*locker
lockersMtx *sync.Mutex
}
func newKeyLocker[K comparable]() *keyLocker[K] {
return &keyLocker[K]{
lockers: make(map[K]*locker),
lockersMtx: &sync.Mutex{},
}
}
func (l *keyLocker[K]) LockKey(key K) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
l.lockersMtx.Unlock()
locker.mtx.Lock()
return
}
locker := &locker{
mtx: &sync.Mutex{},
waiters: 1,
}
locker.mtx.Lock()
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *keyLocker[K]) UnlockKey(key K) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
locker, found := l.lockers[key]
if !found {
return
}
if locker.waiters == 1 {
delete(l.lockers, key)
}
locker.waiters--
locker.mtx.Unlock()
}
// entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct {
ttl time.Duration
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V]
keyLocker *keyLocker[K]
}
// complicates netValueReader with TTL caching mechanism.
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err)
return &ttlNetCache[K, V]{
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
keyLocker: newKeyLocker[K](),
}
}
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
// returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key)
if ok {
if time.Since(val.t) < c.ttl {
return val.v, val.e
}
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
c.cache.Remove(key)
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
val, ok = c.cache.Peek(key)
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
v, err := c.netRdr(key)
c.set(key, v, err)
c.cache.Add(key, &valueWithTime[V]{
v: v,
t: time.Now(),
e: err,
})
return v, err
}
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.LockKey(k)
defer c.keyLocker.UnlockKey(k)
c.cache.Add(k, &valueWithTime[V]{
v: v,
t: time.Now(),
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
}
func (c *ttlNetCache[K, V]) remove(key K) {
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
c.cache.Remove(key)
}

View file

@ -0,0 +1,32 @@
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := newKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.LockKey(0)
defer keyLocker.UnlockKey(0)
require.False(t, taken)
taken = true
require.True(t, taken)
time.Sleep(10 * time.Millisecond)
taken = false
require.False(t, taken)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -864,22 +864,13 @@ func initLocalStorage(c *cfg) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
})
// allocate memory for the service;
// allocate memory for the service to create tombstone source;
// service will be created later
c.cfgObject.getSvc = new(getsvc.Service)
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
var shardsAttached int
for _, optsWithMeta := range c.shardOpts() {
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
if err != nil {
c.log.Error("failed to attach shard to engine", zap.Error(err))
} else {
@ -1080,7 +1071,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
@ -1101,6 +1092,18 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info("configuration has been reloaded successfully")
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
return tombstoneSource
}
func (c *cfg) shutdown() {
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)

View file

@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op.
cnr, err := cachedContainerStorage.Get(ev.ID)
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful

View file

@ -219,4 +219,3 @@ tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost"

2
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.18
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230516125015-c3f61e7c8595
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
git.frostfs.info/TrueCloudLab/hrw v1.2.0

BIN
go.sum

Binary file not shown.

View file

@ -128,7 +128,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
var deletePrm shard.DeletePrm
deletePrm.SetAddresses(addr)
_, err = shards[i].Delete(deletePrm)
_, err = shards[i].Delete(ctx, deletePrm)
if err != nil {
return err
}

View file

@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err
}
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID)

View file

@ -0,0 +1,57 @@
package meta
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// GetChildren returns parent -> children map.
// If an object has no children, then map will contain addr -> empty slice value.
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make(map[oid.Address][]oid.Address, len(addresses))
buffer := make([]byte, bucketKeySize)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, addr := range addresses {
if _, found := result[addr]; found {
continue
}
result[addr] = []oid.Address{}
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer))
if bkt == nil {
continue
}
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer)))
if err != nil {
return err
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return err
}
var resultAddress oid.Address
resultAddress.SetContainer(addr.Container())
resultAddress.SetObject(id)
result[addr] = append(result[addr], resultAddress)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
})
}
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, err
}
// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock()

View file

@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
return res, nil
}
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
tree, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return tree.operations[len(tree.operations)-1].Time, nil
}
// TreeExists implements the pilorama.Forest interface.
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
fullID := cid.EncodeToString() + "/" + treeID

View file

@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
checkExists(t, false, cid, treeID)
})
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID)
height, err := s.TreeHeight(cid, treeID)
require.NoError(t, err)
require.EqualValues(t, 11, height)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
_, err = s.TreeHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
checkExists(t, false, cid, "another tree") // same CID, different tree
t.Run("can be removed", func(t *testing.T) {
require.NoError(t, s.TreeDrop(cid, treeID))

View file

@ -48,6 +48,8 @@ type Forest interface {
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height.
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
}
type ForestStorage interface {

View file

@ -296,8 +296,8 @@ func (s *Shard) Reload(opts ...Option) error {
opts[i](&c)
}
s.m.Lock()
defer s.m.Unlock()
unlock := s.lockExclusive()
defer unlock()
ok, err := s.metaBase.Reload(c.metaOpts...)
if err != nil {
@ -327,3 +327,15 @@ func (s *Shard) Reload(opts ...Option) error {
s.log.Info("trying to restore read-write mode")
return s.setMode(mode.ReadWrite)
}
func (s *Shard) lockExclusive() func() {
s.setModeRequested.Store(true)
val := s.gcCancel.Load()
if val != nil {
cancelGC := val.(context.CancelFunc)
cancelGC()
}
s.m.Lock()
s.setModeRequested.Store(false)
return s.m.Unlock
}

View file

@ -1,6 +1,7 @@
package shard
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -27,84 +28,88 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
// Delete removes data from the shard's writeCache, metaBase and
// blobStor.
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
s.m.RLock()
defer s.m.RUnlock()
return s.delete(prm)
return s.delete(ctx, prm)
}
func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) {
func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
if s.info.Mode.ReadOnly() {
return DeleteRes{}, ErrReadOnlyMode
} else if s.info.Mode.NoMetabase() {
return DeleteRes{}, ErrDegradedMode
}
ln := len(prm.addr)
smalls := make(map[oid.Address][]byte, ln)
for i := range prm.addr {
if s.hasWriteCache() {
err := s.writeCache.Delete(prm.addr[i])
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
s.log.Warn("can't delete object from write cache", zap.String("error", err.Error()))
}
for _, addr := range prm.addr {
select {
case <-ctx.Done():
return DeleteRes{}, ctx.Err()
default:
}
var sPrm meta.StorageIDPrm
sPrm.SetAddress(prm.addr[i])
s.deleteObjectFromWriteCacheSafe(addr)
res, err := s.metaBase.StorageID(sPrm)
if err != nil {
s.log.Debug("can't get storage ID from metabase",
zap.Stringer("object", prm.addr[i]),
zap.String("error", err.Error()))
s.deleteFromBlobstorSafe(addr)
continue
}
if res.StorageID() != nil {
smalls[prm.addr[i]] = res.StorageID()
if err := s.deleteFromMetabase(addr); err != nil {
return DeleteRes{}, err // stop on metabase error ?
}
}
return DeleteRes{}, nil
}
func (s *Shard) deleteObjectFromWriteCacheSafe(addr oid.Address) {
if s.hasWriteCache() {
err := s.writeCache.Delete(addr)
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
s.log.Warn("can't delete object from write cache", zap.String("error", err.Error()))
}
}
}
func (s *Shard) deleteFromMetabase(addr oid.Address) error {
var delPrm meta.DeletePrm
delPrm.SetAddresses(prm.addr...)
delPrm.SetAddresses(addr)
res, err := s.metaBase.Delete(delPrm)
if err != nil {
return DeleteRes{}, err // stop on metabase error ?
return err
}
var totalRemovedPayload uint64
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
for i := range prm.addr {
removedPayload := res.RemovedPhysicalObjectSizes()[i]
totalRemovedPayload += removedPayload
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i]
if logicalRemovedPayload > 0 {
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
}
removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 {
s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload))
}
s.addToPayloadSize(-int64(totalRemovedPayload))
s.addToPayloadSize(-int64(removedPayload))
for i := range prm.addr {
var delPrm common.DeletePrm
delPrm.Address = prm.addr[i]
id := smalls[prm.addr[i]]
delPrm.StorageID = id
_, err = s.blobStor.Delete(delPrm)
if err != nil {
s.log.Debug("can't remove object from blobStor",
zap.Stringer("object_address", prm.addr[i]),
zap.String("error", err.Error()))
}
}
return DeleteRes{}, nil
return nil
}
func (s *Shard) deleteFromBlobstorSafe(addr oid.Address) {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
res, err := s.metaBase.StorageID(sPrm)
if err != nil {
s.log.Debug("can't get storage ID from metabase",
zap.Stringer("object", addr),
zap.String("error", err.Error()))
}
storageID := res.StorageID()
var delPrm common.DeletePrm
delPrm.Address = addr
delPrm.StorageID = storageID
_, err = s.blobStor.Delete(delPrm)
if err != nil {
s.log.Debug("can't remove object from blobStor",
zap.Stringer("object_address", addr),
zap.String("error", err.Error()))
}
}

View file

@ -49,7 +49,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
_, err = sh.Delete(delPrm)
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
_, err = sh.Get(context.Background(), getPrm)
@ -73,7 +73,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
_, err = sh.Delete(delPrm)
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
_, err = sh.Get(context.Background(), getPrm)

View file

@ -57,6 +57,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
writecache.WithMaxObjectSize(wcBigObjectSize),
writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
},
nil,
nil)
}
defer releaseShard(sh, t)
@ -188,7 +189,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
require.Error(t, err)
t.Run("skip errors", func(t *testing.T) {
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil)
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil, nil)
t.Cleanup(func() { require.NoError(t, sh.Close()) })
var restorePrm shard.RestorePrm
@ -219,10 +220,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
}
func TestStream(t *testing.T) {
sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil)
sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil, nil)
defer releaseShard(sh1, t)
sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil)
sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil, nil)
defer releaseShard(sh2, t)
const objCount = 5
@ -323,7 +324,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
writecache.WithSmallObjectSize(wcSmallObjectSize),
writecache.WithMaxObjectSize(wcBigObjectSize),
}
sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2))
sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2), nil)
objects := make([]*objectSDK.Object, objCount)
for i := 0; i < objCount; i++ {
@ -371,7 +372,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0))
}
sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3))
sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3), nil)
require.NoError(t, sh.SetMode(mode.ReadOnly))
{

View file

@ -145,8 +145,8 @@ func (gc *gc) listenEvents(ctx context.Context) {
h := v.handlers[i]
err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done()
h(runCtx, event)
v.prevGroup.Done()
})
if err != nil {
gc.log.Warn("could not submit GC job to worker pool",
@ -196,6 +196,14 @@ func (gc *gc) stop() {
// with GC-marked graves.
// Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.gcCancel.Store(cancel)
if s.setModeRequested.Load() {
return
}
s.m.RLock()
defer s.m.RUnlock()
@ -207,6 +215,12 @@ func (s *Shard) removeGarbage() {
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
@ -233,7 +247,7 @@ func (s *Shard) removeGarbage() {
deletePrm.SetAddresses(buf...)
// delete accumulated objects
_, err = s.delete(deletePrm)
_, err = s.delete(ctx, deletePrm)
if err != nil {
s.log.Warn("could not delete the objects",
zap.String("error", err.Error()),
@ -313,6 +327,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
expired, err := s.getExpiredWithLinked(expired)
if err != nil {
s.log.Warn("failed to get expired objects with linked", zap.Error(err))
return
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
@ -338,6 +358,20 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
}
}
func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(source)
if err != nil {
return nil, err
}
for parent, children := range parentToChildren {
result = append(result, parent)
result = append(result, children...)
}
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

View file

@ -0,0 +1,144 @@
package shard
import (
"context"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
t.Parallel()
rootPath := t.TempDir()
var sh *Shard
l := &logger.Logger{Logger: zaptest.NewLogger(t)}
blobOpts := []blobstor.Option{
blobstor.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(1),
blobovniczatree.WithBlobovniczaShallowWidth(1)),
Policy: func(_ *object.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
}
opts := []Option{
WithID(NewIDFromBytes([]byte{})),
WithLogger(l),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
WithGCRemoverSleepInterval(1 * time.Second),
}
sh = New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() {
require.NoError(t, sh.Close())
})
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)
objID, _ := obj.ID()
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(objID)
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(putPrm)
require.NoError(t, err)
var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err, "failed to get")
//inhume
var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(addr)
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err, "failed to inhume")
_, err = sh.Get(context.Background(), getPrm)
require.Error(t, err, "get returned error")
require.True(t, IsErrNotFound(err), "invalid error type")
//storageID
var metaStIDPrm meta.StorageIDPrm
metaStIDPrm.SetAddress(addr)
storageID, err := sh.metaBase.StorageID(metaStIDPrm)
require.NoError(t, err, "failed to get storage ID")
//check existance in blobstore
var bsExisted common.ExistsPrm
bsExisted.Address = addr
bsExisted.StorageID = storageID.StorageID()
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existance")
require.True(t, exRes.Exists, "invalid blobstore existance result")
//drop from blobstor
var bsDeletePrm common.DeletePrm
bsDeletePrm.Address = addr
bsDeletePrm.StorageID = storageID.StorageID()
_, err = sh.blobStor.Delete(bsDeletePrm)
require.NoError(t, err, "failed to delete from blobstore")
//check existance in blobstore
exRes, err = sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existance")
require.False(t, exRes.Exists, "invalid blobstore existance result")
//get should return object not found
_, err = sh.Get(context.Background(), getPrm)
require.Error(t, err, "get returned no error")
require.True(t, IsErrNotFound(err), "invalid error type")
}

View file

@ -2,77 +2,30 @@ package shard_test
import (
"context"
"path/filepath"
"errors"
"testing"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func Test_GCDropsLockedExpiredObject(t *testing.T) {
var sh *shard.Shard
func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
t.Parallel()
epoch := &epochState{
Value: 100,
}
rootPath := t.TempDir()
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(2),
blobovniczatree.WithBlobovniczaShallowWidth(2)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
sh = shard.New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
t.Cleanup(func() {
releaseShard(sh, t)
@ -120,3 +73,97 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted")
}
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
t.Parallel()
epoch := &epochState{
Value: 100,
}
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
var objExpirationAttr objectSDK.Attribute
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
objExpirationAttr.SetValue("101")
var lockExpirationAttr objectSDK.Attribute
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
lockExpirationAttr.SetValue("103")
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
parent.SetAttributes(objExpirationAttr)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
linkID, _ := link.ID()
sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
t.Cleanup(func() {
releaseShard(sh, t)
})
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID()
var putPrm shard.PutPrm
for _, child := range children {
putPrm.SetObject(child)
_, err := sh.Put(putPrm)
require.NoError(t, err)
}
putPrm.SetObject(link)
_, err := sh.Put(putPrm)
require.NoError(t, err)
err = sh.Lock(cnr, lockID, append(childIDs, parentID, linkID))
require.NoError(t, err)
putPrm.SetObject(lock)
_, err = sh.Put(putPrm)
require.NoError(t, err)
var getPrm shard.GetPrm
getPrm.SetAddress(objectCore.AddressOf(parent))
_, err = sh.Get(context.Background(), getPrm)
var splitInfoError *objectSDK.SplitInfoError
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
}

View file

@ -168,7 +168,7 @@ func TestCounters(t *testing.T) {
deletedNumber := int(phy / 4)
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
_, err := sh.Delete(prm)
_, err := sh.Delete(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical])

View file

@ -18,8 +18,8 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode")
// Returns any error encountered that did not allow
// setting shard mode.
func (s *Shard) SetMode(m mode.Mode) error {
s.m.Lock()
defer s.m.Unlock()
unlock := s.lockExclusive()
defer unlock()
return s.setMode(m)
}

View file

@ -84,7 +84,8 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
Storage: fstree.New(
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
},
})})
})},
nil)
defer releaseShard(sh, t)
for _, tc := range testCases {

View file

@ -3,6 +3,7 @@ package shard
import (
"context"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -31,6 +32,9 @@ type Shard struct {
metaBase *meta.DB
tsSource TombstoneSource
gcCancel atomic.Value
setModeRequested atomic.Bool
}
// Option represents Shard's constructor option.
@ -209,12 +213,12 @@ func WithWriteCache(use bool) Option {
}
// hasWriteCache returns bool if write cache exists on shards.
func (s Shard) hasWriteCache() bool {
func (s *Shard) hasWriteCache() bool {
return s.cfg.useWriteCache
}
// needRefillMetabase returns true if metabase is needed to be refilled.
func (s Shard) needRefillMetabase() bool {
func (s *Shard) needRefillMetabase() bool {
return s.cfg.refillMetabase
}

View file

@ -4,6 +4,7 @@ import (
"context"
"path/filepath"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
@ -12,8 +13,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
@ -29,11 +33,13 @@ func (s epochState) CurrentEpoch() uint64 {
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
return newCustomShard(t, t.TempDir(), enableWriteCache,
nil,
nil,
nil)
}
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard {
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard {
var sh *shard.Shard
if enableWriteCache {
rootPath = filepath.Join(rootPath, "wc")
} else {
@ -67,8 +73,9 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
append([]meta.Option{
meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})},
metaOptions...)...,
),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
shard.WithWriteCache(enableWriteCache),
@ -77,9 +84,21 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
[]writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))},
wcOpts...)...,
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithGCRemoverSleepInterval(1 * time.Millisecond),
}
sh := shard.New(opts...)
sh = shard.New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))

View file

@ -37,7 +37,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
writecache.WithSmallObjectSize(smallSize),
writecache.WithMaxObjectSize(smallSize * 2)}
sh := newCustomShard(t, dir, true, wcOpts, nil)
sh := newCustomShard(t, dir, true, wcOpts, nil, nil)
var putPrm shard.PutPrm
@ -48,7 +48,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
}
require.NoError(t, sh.Close())
sh = newCustomShard(t, dir, true, wcOpts, nil)
sh = newCustomShard(t, dir, true, wcOpts, nil, nil)
defer releaseShard(sh, t)
var getPrm shard.GetPrm

View file

@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
return s.pilorama.TreeList(cid)
}
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(cid, treeID)
}
// TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
if s.pilorama == nil {

View file

@ -32,11 +32,11 @@ const (
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.flushWorker(i)
go c.workerFlushSmall()
}
c.wg.Add(1)
go c.flushBigObjects()
go c.workerFlushBig()
c.wg.Add(1)
go func() {
@ -48,7 +48,7 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
c.flushDB()
c.flushSmallObjects()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
return
@ -57,7 +57,7 @@ func (c *cache) runFlushLoop() {
}()
}
func (c *cache) flushDB() {
func (c *cache) flushSmallObjects() {
var lastKey []byte
var m []objectInfo
for {
@ -70,7 +70,7 @@ func (c *cache) flushDB() {
m = m[:0]
c.modeMtx.RLock()
if c.readOnly() || !c.initialized.Load() {
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@ -109,10 +109,6 @@ func (c *cache) flushDB() {
var count int
for i := range m {
if c.flushed.Contains(m[i].addr) {
continue
}
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
@ -140,7 +136,7 @@ func (c *cache) flushDB() {
}
}
func (c *cache) flushBigObjects() {
func (c *cache) workerFlushBig() {
defer c.wg.Done()
tick := time.NewTicker(defaultFlushInterval * 10)
@ -151,9 +147,6 @@ func (c *cache) flushBigObjects() {
if c.readOnly() {
c.modeMtx.RUnlock()
break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
}
_ = c.flushFSTree(true)
@ -181,10 +174,6 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
c.reportFlushError("can't read a file", sAddr, err)
@ -212,9 +201,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err
}
// mark object as flushed
c.flushed.Add(sAddr, false)
c.deleteFromDisk([]string{sAddr})
return nil
}
@ -222,8 +209,8 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err
}
// flushWorker writes objects to the main storage.
func (c *cache) flushWorker(_ int) {
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *object.Object
@ -236,9 +223,12 @@ func (c *cache) flushWorker(_ int) {
}
err := c.flushObject(obj, nil)
if err == nil {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
}
}
@ -294,10 +284,6 @@ func (c *cache) flush(ignoreErrors bool) error {
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err)
if ignoreErrors {

View file

@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -15,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(false))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("flush on moving to degraded mode", func(t *testing.T) {
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("ignore errors", func(t *testing.T) {
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
})
})
})
t.Run("on init", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// not found
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// ok
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
}
require.NoError(t, wc.Close())
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
for i := range objects {
var prm meta.PutPrm
prm.SetObject(objects[i].obj)
_, err := mb.Put(prm)
require.NoError(t, err)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
inhumePrm.SetTombstoneAddress(oidtest.Address())
_, err := mb.Inhume(inhumePrm)
require.NoError(t, err)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
_, err = mb.Delete(deletePrm)
require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i)
}
}
})
}
func putObject(t *testing.T, c Cache, size int) objectPair {
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}

View file

@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
c.flushed.Get(saddr)
return res.Object, nil
}

View file

@ -1,183 +0,0 @@
package writecache
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func (c *cache) initFlushMarks() {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate()
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate()
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(prm)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
}
}
}
return nil
}
_, _ = c.fsTree.Iterate(prm)
c.log.Info("finished updating FSTree flush marks")
}
func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database")
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
m = append(m, string(k))
}
return nil
})
var addr oid.Address
for i := range m {
if err := addr.DecodeString(m[i]); err != nil {
continue
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log,
zap.String("address", m[j]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info("finished updating flush marks")
}
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(existsPrm)
if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
return needRemove, needRemove
}
var prm meta.StorageIDPrm
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return prm.handler(data)
})
})
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f()
if err != nil {
if prm.ignoreErrors {

View file

@ -36,19 +36,6 @@ func (c *cache) setMode(m mode.Mode) error {
}
}
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks()
}
}()
}
if c.db != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)

View file

@ -26,8 +26,6 @@ func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))
@ -67,7 +65,7 @@ func (c *cache) putSmall(obj objectInfo) error {
)
c.objCounters.IncDB()
}
return nil
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue.

View file

@ -11,8 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -20,19 +18,7 @@ import (
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
maxFlushedMarksCount int
maxRemoveBatchSize int
// flushed contains addresses of objects that were already flushed to the main storage.
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
// frequently read ones.
// MUST NOT be used inside bolt db transaction because it's eviction handler
// removes untracked items from the database.
flushed simplelru.LRUCache[string, bool]
db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
db *bbolt.DB
}
const dbName = "small.bolt"
@ -71,35 +57,9 @@ func (c *cache) openStore(readOnly bool) error {
return fmt.Errorf("could not open FSTree: %w", err)
}
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil
}
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
} else {
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -51,11 +50,8 @@ type cache struct {
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex
mode mode.Mode
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
@ -95,9 +91,8 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),
options: options{
@ -116,12 +111,6 @@ func New(opts ...Option) Cache {
opts[i](&c.options)
}
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
// Assume small and big objects are stored in 50-50 proportion.
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
return c
}
@ -152,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
c.initFlushMarks()
c.runFlushLoop()
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// Finish all in-progress operations.
if err := c.setMode(mode.ReadOnly); err != nil {
return err
}
if c.closeCh != nil {
close(c.closeCh)
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
if c.closeCh != nil {
c.closeCh = nil
}
c.initialized.Store(false)
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()

View file

@ -57,8 +57,6 @@ type Client struct {
acc *wallet.Account // neo account
accAddr util.Uint160 // account's address
signer *transaction.Signer
notary *notaryInfo
cfg cfg
@ -70,9 +68,6 @@ type Client struct {
// on every normal call.
switchLock *sync.RWMutex
notifications chan rpcclient.Notification
subsInfo // protected with switchLock
// channel for internal stop
closeChan chan struct{}
@ -567,26 +562,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val
// NotificationChannel returns channel than receives subscribed
// notification from the connected RPC node.
// Channel is closed when connection to the RPC node has been
// lost without the possibility of recovery.
// Channel is closed when connection to the RPC node is lost.
func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
return c.notifications
}
// inactiveMode switches Client to an inactive mode:
// - notification channel is closed;
// - all the new RPC request would return ErrConnectionLost;
// - inactiveModeCb is called if not nil.
func (c *Client) inactiveMode() {
c.switchLock.Lock()
defer c.switchLock.Unlock()
close(c.notifications)
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
c.switchLock.RLock()
defer c.switchLock.RUnlock()
return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980
}
func (c *Client) setActor(act *actor.Actor) {

View file

@ -9,11 +9,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -101,22 +98,13 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
}
cli := &Client{
cache: newClientCache(),
logger: cfg.logger,
acc: acc,
accAddr: accAddr,
signer: cfg.signer,
cfg: *cfg,
switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification),
subsInfo: subsInfo{
blockRcv: make(chan *block.Block),
notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
},
closeChan: make(chan struct{}),
cache: newClientCache(),
logger: cfg.logger,
acc: acc,
accAddr: accAddr,
cfg: *cfg,
switchLock: &sync.RWMutex{},
closeChan: make(chan struct{}),
}
cli.endpoints.init(cfg.endpoints)
@ -145,7 +133,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
}
cli.setActor(act)
go cli.notificationLoop(ctx)
go cli.closeWaiter(ctx)
return cli, nil
}

View file

@ -5,11 +5,6 @@ import (
"sort"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap"
)
@ -33,7 +28,8 @@ func (e *endpoints) init(ee []Endpoint) {
e.list = ee
}
func (c *Client) switchRPC(ctx context.Context) bool {
// SwitchRPC performs reconnection and returns true if it was successful.
func (c *Client) SwitchRPC(ctx context.Context) bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()
@ -57,20 +53,8 @@ func (c *Client) switchRPC(ctx context.Context) bool {
c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint))
subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false)
if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
c.client = cli
c.setActor(act)
c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -81,97 +65,21 @@ func (c *Client) switchRPC(ctx context.Context) bool {
return true
}
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
return false
}
func (c *Client) notificationLoop(ctx context.Context) {
var e any
var ok bool
for {
c.switchLock.RLock()
bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select {
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
case e, ok = <-bChan:
case e, ok = <-nChan:
case e, ok = <-nrChan:
}
if ok {
c.routeEvent(ctx, e)
continue
}
if !c.reconnect(ctx) {
return
}
}
}
func (c *Client) routeEvent(ctx context.Context, e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
func (c *Client) closeWaiter(ctx context.Context) {
select {
case c.notifications <- typedNotification:
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect(ctx context.Context) bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method, that happens only when a client has
// switched to the more prioritized RPC
return true
}
if !c.switchRPC(ctx) {
c.logger.Error("could not establish connection to any RPC node")
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
return true
_ = c.UnsubscribeAll()
c.close()
}
func (c *Client) switchToMostPrioritized(ctx context.Context) {
@ -217,36 +125,28 @@ mainLoop:
continue
}
if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok {
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
return
}
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.subsInfo = subs
c.endpoints.curr = i
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
c.logger.Info("switched to the higher priority RPC",
zap.String("endpoint", tryE))
return
}
c.logger.Warn("could not restore side chain subscriptions using node",
zap.String("endpoint", tryE),
zap.Error(err),
)
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.endpoints.curr = i
c.switchLock.Unlock()
c.logger.Info("switched to the higher priority RPC",
zap.String("endpoint", tryE))
return
}
}
}
@ -254,6 +154,7 @@ mainLoop:
// close closes notification channel and wrapped WS client.
func (c *Client) close() {
close(c.notifications)
c.switchLock.RLock()
defer c.switchLock.RUnlock()
c.client.Close()
}

View file

@ -208,8 +208,8 @@ func (c *Client) SetGroupSignerScope() error {
return err
}
c.signer.Scopes = transaction.CustomGroups
c.signer.AllowedGroups = []*keys.PublicKey{pub}
c.cfg.signer.Scopes = transaction.CustomGroups
c.cfg.signer.AllowedGroups = []*keys.PublicKey{pub}
return nil
}

View file

@ -596,18 +596,18 @@ func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, comm
s = append(s, transaction.Signer{
Account: hash.Hash160(multisigScript),
Scopes: c.signer.Scopes,
AllowedContracts: c.signer.AllowedContracts,
AllowedGroups: c.signer.AllowedGroups,
Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.cfg.signer.AllowedGroups,
})
if !invokedByAlpha {
// then we have invoker signature
s = append(s, transaction.Signer{
Account: hash.Hash160(c.acc.GetVerificationScript()),
Scopes: c.signer.Scopes,
AllowedContracts: c.signer.AllowedContracts,
AllowedGroups: c.signer.AllowedGroups,
Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.cfg.signer.AllowedGroups,
})
}

View file

@ -1,15 +1,11 @@
package client
import (
"context"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
// Close closes connection to the remote side making
@ -23,71 +19,46 @@ func (c *Client) Close() {
close(c.closeChan)
}
// SubscribeForExecutionNotifications adds subscription for notifications
// generated during contract transaction execution to this instance of client.
// ReceiveExecutionNotifications performs subscription for notifications
// generated during contract execution. Events are sent to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error {
func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
return "", ErrConnectionLost
}
_, subscribed := c.subscribedEvents[contract]
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil {
return err
}
c.subscribedEvents[contract] = id
return nil
return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch)
}
// SubscribeForNewBlocks adds subscription for new block events to this
// instance of client.
// ReceiveBlocks performs subscription for new block events. Events are sent
// to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNewBlocks() error {
func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
return "", ErrConnectionLost
}
if c.subscribedToBlocks {
// no need to subscribe one more time
return nil
}
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil {
return err
}
c.subscribedToBlocks = true
return nil
return c.client.ReceiveBlocks(nil, ch)
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
// ReceiveNotaryRequests performsn subscription for notary request payloads
// addition or removal events to this instance of client. Passed txSigner is
// used as filter: subscription is only for the notary requests that must be
// signed by txSigner.
// signed by txSigner. Events are sent to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
@ -96,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
return "", ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[txSigner]
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil {
return err
}
c.subscribedNotaryEvents[txSigner] = id
return nil
return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch)
}
// UnsubscribeContract removes subscription for given contract event stream.
// Unsubscribe performs unsubscription for the given subscription ID.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeContract(contract util.Uint160) error {
func (c *Client) Unsubscribe(subID string) error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
@ -127,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error {
return ErrConnectionLost
}
_, subscribed := c.subscribedEvents[contract]
if !subscribed {
// no need to unsubscribe contract
// without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedEvents[contract])
if err != nil {
return err
}
delete(c.subscribedEvents, contract)
return nil
}
// UnsubscribeNotaryRequest removes subscription for given notary requests
// signer.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[signer]
if !subscribed {
// no need to unsubscribe signer's
// requests without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer])
if err != nil {
return err
}
delete(c.subscribedNotaryEvents, signer)
return nil
return c.client.Unsubscribe(subID)
}
// UnsubscribeAll removes all active subscriptions of current client.
@ -190,163 +100,10 @@ func (c *Client) UnsubscribeAll() error {
return ErrConnectionLost
}
// no need to unsubscribe if there are
// no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
!c.subscribedToBlocks {
return nil
}
err := c.client.UnsubscribeAll()
if err != nil {
return err
}
c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string)
c.subscribedToBlocks = false
return nil
}
// subsInfo includes channels for ws notifications;
// cached subscription information.
type subsInfo struct {
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var (
err error
id string
)
stopCh := make(chan struct{})
defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background)
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration
if si.subscribedToBlocks {
_, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil {
c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
}
// notification events restoration
for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedNotaryEvents[signer] = id
}
}
return si, true
}
func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block,
notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) {
// neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification
// while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions
go func() {
var e any
var ok bool
for {
select {
case <-stopCh:
return
case e, ok = <-blockRcv:
case e, ok = <-notificationRcv:
case e, ok = <-notaryReqRcv:
}
if !ok {
return
}
if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
}
c.routeEvent(ctx, e)
}
}()
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
}

View file

@ -10,8 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
@ -35,16 +35,27 @@ type (
Close()
}
subChannels struct {
NotifyChan chan *state.ContainedNotificationEvent
BlockChan chan *block.Block
NotaryChan chan *result.NotaryRequestEvent
}
subscriber struct {
*sync.RWMutex
log *logger.Logger
client *client.Client
notifyChan chan *state.ContainedNotificationEvent
blockChan chan *block.Block
blockChan chan *block.Block
notaryChan chan *result.NotaryRequestEvent
current subChannels
// cached subscription information
subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool
}
// Params is a group of Subscriber constructor parameters.
@ -75,22 +86,28 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock()
defer s.Unlock()
notifyIDs := make(map[util.Uint160]struct{}, len(contracts))
notifyIDs := make([]string, 0, len(contracts))
for i := range contracts {
if s.subscribedEvents[contracts[i]] {
continue
}
// subscribe to contract notifications
err := s.client.SubscribeForExecutionNotifications(contracts[i])
id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan)
if err != nil {
// if there is some error, undo all subscriptions and return error
for hash := range notifyIDs {
_ = s.client.UnsubscribeContract(hash)
for _, id := range notifyIDs {
_ = s.client.Unsubscribe(id)
}
return err
}
// save notification id
notifyIDs[contracts[i]] = struct{}{}
notifyIDs = append(notifyIDs, id)
}
for i := range contracts {
s.subscribedEvents[contracts[i]] = true
}
return nil
@ -109,82 +126,34 @@ func (s *subscriber) Close() {
}
func (s *subscriber) BlockNotifications() error {
if err := s.client.SubscribeForNewBlocks(); err != nil {
s.Lock()
defer s.Unlock()
if s.subscribedToNewBlocks {
return nil
}
if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil {
return fmt.Errorf("could not subscribe for new block events: %w", err)
}
s.subscribedToNewBlocks = true
return nil
}
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil {
s.Lock()
defer s.Unlock()
if s.subscribedNotaryEvents[mainTXSigner] {
return nil
}
if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil {
return fmt.Errorf("could not subscribe for notary request events: %w", err)
}
s.subscribedNotaryEvents[mainTXSigner] = true
return nil
}
func (s *subscriber) routeNotifications(ctx context.Context) {
notificationChan := s.client.NotificationChannel()
for {
select {
case <-ctx.Done():
return
case notification, ok := <-notificationChan:
if !ok {
s.log.Warn("remote notification channel has been closed")
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
return
}
switch notification.Type {
case neorpc.NotificationEventID:
notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent)
if !ok {
s.log.Error("can't cast notify event value to the notify struct",
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.log.Debug("new notification event from sidechain",
zap.String("name", notifyEvent.Name),
)
s.notifyChan <- notifyEvent
case neorpc.BlockEventID:
b, ok := notification.Value.(*block.Block)
if !ok {
s.log.Error("can't cast block event value to block",
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.blockChan <- b
case neorpc.NotaryRequestEventID:
notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent)
if !ok {
s.log.Error("can't cast notify event value to the notary request struct",
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.notaryChan <- notaryRequest
default:
s.log.Debug("unsupported notification from the chain",
zap.Uint8("type", uint8(notification.Type)),
)
}
}
}
}
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) {
switch {
@ -208,16 +177,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent),
}
// Worker listens all events from neo-go websocket and puts them
// into corresponding channel. It may be notifications, transactions,
// new blocks. For now only notifications.
current: newSubChannels(),
subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool),
}
// Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels.
go sub.routeNotifications(ctx)
return sub, nil
}
func (s *subscriber) routeNotifications(ctx context.Context) {
var (
// TODO: not needed after nspcc-dev/neo-go#2980.
cliCh = s.client.NotificationChannel()
restoreCh = make(chan bool)
restoreInProgress bool
)
routeloop:
for {
var connLost bool
s.RLock()
curr := s.current
s.RUnlock()
select {
case <-ctx.Done():
break routeloop
case ev, ok := <-curr.NotifyChan:
if ok {
s.notifyChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.BlockChan:
if ok {
s.blockChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.NotaryChan:
if ok {
s.notaryChan <- ev
} else {
connLost = true
}
case _, ok := <-cliCh:
connLost = !ok
case ok := <-restoreCh:
restoreInProgress = false
if !ok {
connLost = true
}
}
if connLost {
if !restoreInProgress {
restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh)
if !restoreInProgress {
break routeloop
}
curr.drain()
} else { // Avoid getting additional !ok events.
s.Lock()
s.current.NotifyChan = nil
s.current.BlockChan = nil
s.current.NotaryChan = nil
s.Unlock()
}
}
}
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
}
func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) {
s.log.Info("RPC connection lost, attempting reconnect")
if !s.client.SwitchRPC(ctx) {
s.log.Error("can't switch RPC node")
return false, nil
}
cliCh := s.client.NotificationChannel()
s.Lock()
chs := newSubChannels()
go func() {
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
}()
s.current = chs
s.Unlock()
return true, cliCh
}
func newSubChannels() subChannels {
return subChannels{
NotifyChan: make(chan *state.ContainedNotificationEvent),
BlockChan: make(chan *block.Block),
NotaryChan: make(chan *result.NotaryRequestEvent),
}
}
func (s *subChannels) drain() {
drainloop:
for {
select {
case _, ok := <-s.NotifyChan:
if !ok {
s.NotifyChan = nil
}
case _, ok := <-s.BlockChan:
if !ok {
s.BlockChan = nil
}
case _, ok := <-s.NotaryChan:
if !ok {
s.NotaryChan = nil
}
default:
break drainloop
}
}
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent,
blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool {
var err error
// new block events restoration
if s.subscribedToNewBlocks {
_, err = s.client.ReceiveBlocks(blCh)
if err != nil {
s.log.Error("could not restore block subscription after RPC switch", zap.Error(err))
return false
}
}
// notification events restoration
for contract := range s.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveExecutionNotifications(contract, notifCh)
if err != nil {
s.log.Error("could not restore notification subscription after RPC switch", zap.Error(err))
return false
}
}
// notary notification events restoration
for signer := range s.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveNotaryRequests(signer, notaryCh)
if err != nil {
s.log.Error("could not restore notary notification subscription after RPC switch", zap.Error(err))
return false
}
}
return true
}
// awaitHeight checks if remote client has least expected block height and
// returns error if it is not reached that height after timeout duration.
// This function is required to avoid connections to unsynced RPC nodes, because

View file

@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p
}
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
if p != nil {
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
}
return p
}
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil {
p.relay = f

View file

@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2),
).
WithRelay(s.relayRequest).
WithCommonPrm(commonPrm), nil
WithCommonPrm(commonPrm).
WithCopyNumber(part.GetCopiesNumber()), nil
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -27,7 +27,7 @@ type cacheItem struct {
}
const (
defaultClientCacheSize = 10
defaultClientCacheSize = 32
defaultClientConnectTimeout = time.Second * 2
defaultReconnectInterval = time.Second * 15
)
@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() {
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
_ = value.cc.Close()
if conn := value.cc; conn != nil {
_ = conn.Close()
}
})
c.LRU = *l
}

View file

@ -13,6 +13,7 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -31,6 +32,8 @@ type Service struct {
syncChan chan struct{}
syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
}
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
}
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
}
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
}
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
}
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
h := b.GetHeight()
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
if err != nil {
return err
}
for {
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
if err != nil || lm.Time == 0 {
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
return err
}
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId())
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
}
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil
}

View file

@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(1024)
var heightMtx sync.Mutex
for {
newHeight := height
req := &GetOpLogRequest{
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
},
}
if err := SignMessage(req, s.key); err != nil {
_ = errG.Wait()
return newHeight, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
_ = errG.Wait()
return newHeight, fmt.Errorf("can't initialize client: %w", err)
}
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errG.Wait()
return newHeight, err
}
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
errG.Go(func() error {
err := s.forest.TreeApply(cid, treeID, m, true)
heightMtx.Lock()
defer heightMtx.Unlock()
if err != nil {
if newHeight > height {
height = newHeight
}
return err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
return nil
})
}
applyErr := errG.Wait()
if err == nil {
err = applyErr
}
heightMtx.Lock()
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
heightMtx.Unlock()
return newHeight, err
}
height = newHeight
heightMtx.Unlock()
}
}
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error("could not fetch containers", zap.Error(err))
continue
break
}
newMap, cnrsToSync := s.containersToSync(cnrs)
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.log.Debug("trees have been synchronized")
}
s.initialSyncDone.Store(true)
}
}