Compare commits

...

22 commits

Author SHA1 Message Date
2360cf263b [#392] shard: Create tombstone source when reload
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 15:52:59 +03:00
f866ec1399 [#392] gc: Use defer to mark handler done
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 15:52:49 +03:00
a506da97d6 [#384] shard: Add unit test
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Add test to check that oject not found error will be returned,
if object doesn't exist in blobstore.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 09:55:16 +03:00
1dd84eef77 [#384] shard: Cancel GC if change mode requested
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-25 09:55:16 +03:00
1501f11e4d [#351] cli: Support copies number parameter in object put
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-05-18 15:52:51 +00:00
4f55417914 [#351] Fix end of files
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-05-18 15:52:51 +00:00
9bda6e0b8b [#332] gc: Fix expired complex object deletion
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-05-18 12:51:34 +00:00
ceb9deb7f1 [#337] morph: Move subscription logic to subscriber
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 11:28:50 +03:00
4148590668 [#365] go.mod: Update api-go
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 09:51:07 +03:00
493cafc62a [#355] Increase tree svc client cache size to test hypotheses
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-17 15:17:35 +03:00
3711976dfc [#314] writecache: remove objects right after they are flushed
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-16 14:16:25 +03:00
Pavel Karpy
c3f5045842 [#314] wc: Do not lose small objects on disk errors
Do return error if an object could not been stored on WC's disk.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-15 16:27:44 +03:00
Pavel Karpy
ab65063d6d [#314] wc: Simplify background workers naming
Also, drop not used arg.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-15 16:27:42 +03:00
Pavel Karpy
c60029d3b0 [#323] node: Fix tree svc panic
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
If a connection has not been established earlier, it stores `nil` in LRU
cache. Cache eviction tries to close every connection (even a `nil` one) and
panics but not crash the app because we are using pools.
That ugly bug also leads to a deadlock where `Unlock` is not called via
`defer` func (and that is the way I found it).

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-04 20:04:30 +03:00
Pavel Karpy
0beb7ccf5c [#284] node: Use copy_number on server side
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-04-26 10:57:34 +03:00
0fe5e34fb0 [#231] node: Fix race condition in TTL cache
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Use key locker to lock by key.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-25 16:50:27 +03:00
bcf3f0f517 [#231] node: Invalidate container cache on PutSuccess event
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
For example: frostfs-cli creates container and makes polling
GetContainer requests. These requests go through container cache,
so not found error stores in container cache.
So container cache can contain not found error when PutSuccess event received.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-20 17:34:00 +03:00
79d59e4ed2 [#266] services/tree: Do not accept requests until initial sync is finished
Some checks failed
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/push/pre-commit Pipeline failed
`Apply` is deliberately left out -- we don't want to miss anything new.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
364b4ac572 [#266] services/tree: Batch operations on the service level
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
f7679a8168 [#266] services/tree: Return operation log up to some height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
2dc2fe8780 [#266] pilorama: Allow to get current tree height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
21412ef24a [#263] node: Up api-go version
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Fix panic in tracing.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-18 11:45:42 +03:00
53 changed files with 1062 additions and 1087 deletions

View file

@ -4,8 +4,13 @@ Changelog for FrostFS Node
## [Unreleased] ## [Unreleased]
### Added ### Added
- Support copies number parameter in `frostfs-cli object put` (#351)
### Changed ### Changed
### Fixed ### Fixed
- Copy number was not used for `PUT` requests (#284)
- Tree service panic in its internal client cache (#323)
### Removed ### Removed
### Updated ### Updated
### Updating from v0.36.0 ### Updating from v0.36.0
@ -64,6 +69,7 @@ Changelog for FrostFS Node
- Iterating over just removed files by FSTree (#98) - Iterating over just removed files by FSTree (#98)
- Parts of a locked object could not be removed anymore (#141) - Parts of a locked object could not be removed anymore (#141)
- Non-alphabet nodes do not try to handle alphabet events (#181) - Non-alphabet nodes do not try to handle alphabet events (#181)
- Delete complex objects with GC (#332)
### Removed ### Removed
### Updated ### Updated

View file

@ -329,6 +329,8 @@ func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) {
type PutObjectPrm struct { type PutObjectPrm struct {
commonObjectPrm commonObjectPrm
copyNum uint32
hdr *object.Object hdr *object.Object
rdr io.Reader rdr io.Reader
@ -352,6 +354,11 @@ func (x *PutObjectPrm) SetHeaderCallback(f func(*object.Object)) {
x.headerCallback = f x.headerCallback = f
} }
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
func (x *PutObjectPrm) SetCopiesNumber(copiesNumbers uint32) {
x.copyNum = copiesNumbers
}
// PutObjectRes groups the resulting values of PutObject operation. // PutObjectRes groups the resulting values of PutObject operation.
type PutObjectRes struct { type PutObjectRes struct {
id oid.ID id oid.ID
@ -381,6 +388,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
} }
putPrm.WithXHeaders(prm.xHeaders...) putPrm.WithXHeaders(prm.xHeaders...)
putPrm.SetCopiesNumber(prm.copyNum)
wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm) wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm)
if err != nil { if err != nil {

View file

@ -25,6 +25,7 @@ import (
const ( const (
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
notificationFlag = "notify" notificationFlag = "notify"
copiesNumberFlag = "copies-number"
) )
var putExpiredOn uint64 var putExpiredOn uint64
@ -56,6 +57,8 @@ func initObjectPutCmd() {
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default") flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.") flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
flags.Uint32(copiesNumberFlag, 0, "Number of copies of the object to store within the RPC call")
} }
func putObject(cmd *cobra.Command, _ []string) { func putObject(cmd *cobra.Command, _ []string) {
@ -116,6 +119,12 @@ func putObject(cmd *cobra.Command, _ []string) {
} }
} }
copyNum, err := cmd.Flags().GetUint32(copiesNumberFlag)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
if copyNum > 0 {
prm.SetCopiesNumber(copyNum)
}
res, err := internalclient.PutObject(prm) res, err := internalclient.PutObject(prm)
if p != nil { if p != nil {
p.Finish() p.Finish()

View file

@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
e error e error
} }
type locker struct {
mtx *sync.Mutex
waiters int // not protected by mtx, must used outer mutex to update concurrently
}
type keyLocker[K comparable] struct {
lockers map[K]*locker
lockersMtx *sync.Mutex
}
func newKeyLocker[K comparable]() *keyLocker[K] {
return &keyLocker[K]{
lockers: make(map[K]*locker),
lockersMtx: &sync.Mutex{},
}
}
func (l *keyLocker[K]) LockKey(key K) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
l.lockersMtx.Unlock()
locker.mtx.Lock()
return
}
locker := &locker{
mtx: &sync.Mutex{},
waiters: 1,
}
locker.mtx.Lock()
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *keyLocker[K]) UnlockKey(key K) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
locker, found := l.lockers[key]
if !found {
return
}
if locker.waiters == 1 {
delete(l.lockers, key)
}
locker.waiters--
locker.mtx.Unlock()
}
// entity that provides TTL cache interface. // entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct { type ttlNetCache[K comparable, V any] struct {
ttl time.Duration ttl time.Duration
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
cache *lru.Cache[K, *valueWithTime[V]] cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V] netRdr netValueReader[K, V]
keyLocker *keyLocker[K]
} }
// complicates netValueReader with TTL caching mechanism. // complicates netValueReader with TTL caching mechanism.
@ -45,6 +102,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
sz: sz, sz: sz,
cache: cache, cache: cache,
netRdr: netRdr, netRdr: netRdr,
keyLocker: newKeyLocker[K](),
} }
} }
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
// returned value should not be modified. // returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) { func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key) val, ok := c.cache.Peek(key)
if ok { if ok && time.Since(val.t) < c.ttl {
if time.Since(val.t) < c.ttl {
return val.v, val.e return val.v, val.e
} }
c.cache.Remove(key) c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
val, ok = c.cache.Peek(key)
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
} }
v, err := c.netRdr(key) v, err := c.netRdr(key)
c.set(key, v, err) c.cache.Add(key, &valueWithTime[V]{
v: v,
t: time.Now(),
e: err,
})
return v, err return v, err
} }
func (c *ttlNetCache[K, V]) set(k K, v V, e error) { func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.LockKey(k)
defer c.keyLocker.UnlockKey(k)
c.cache.Add(k, &valueWithTime[V]{ c.cache.Add(k, &valueWithTime[V]{
v: v, v: v,
t: time.Now(), t: time.Now(),
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
} }
func (c *ttlNetCache[K, V]) remove(key K) { func (c *ttlNetCache[K, V]) remove(key K) {
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
c.cache.Remove(key) c.cache.Remove(key)
} }

View file

@ -0,0 +1,32 @@
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := newKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.LockKey(0)
defer keyLocker.UnlockKey(0)
require.False(t, taken)
taken = true
require.True(t, taken)
time.Sleep(10 * time.Millisecond)
taken = false
require.False(t, taken)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -864,22 +864,13 @@ func initLocalStorage(c *cfg) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
}) })
// allocate memory for the service; // allocate memory for the service to create tombstone source;
// service will be created later // service will be created later
c.cfgObject.getSvc = new(getsvc.Service) c.cfgObject.getSvc = new(getsvc.Service)
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
var shardsAttached int var shardsAttached int
for _, optsWithMeta := range c.shardOpts() { for _, optsWithMeta := range c.shardOpts() {
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
if err != nil { if err != nil {
c.log.Error("failed to attach shard to engine", zap.Error(err)) c.log.Error("failed to attach shard to engine", zap.Error(err))
} else { } else {
@ -1080,7 +1071,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
var rcfg engine.ReConfiguration var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() { for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
} }
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
@ -1101,6 +1092,18 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info("configuration has been reloaded successfully") c.log.Info("configuration has been reloaded successfully")
} }
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
return tombstoneSource
}
func (c *cfg) shutdown() { func (c *cfg) shutdown() {
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)

View file

@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
// TODO: use owner directly from the event after neofs-contract#256 will become resolved // TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it: // but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op. // creation success are most commonly tracked by polling GET op.
cnr, err := cachedContainerStorage.Get(ev.ID) cnr, err := cnrSrc.Get(ev.ID)
if err == nil { if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true) cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
} else { } else {
// unlike removal, we expect successful receive of the container // unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful // after successful creation, so logging can be useful

View file

@ -219,4 +219,3 @@ tracing:
enabled: true enabled: true
exporter: "otlp_grpc" exporter: "otlp_grpc"
endpoint: "localhost" endpoint: "localhost"

2
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.18 go 1.18
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230516125015-c3f61e7c8595
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
git.frostfs.info/TrueCloudLab/hrw v1.2.0 git.frostfs.info/TrueCloudLab/hrw v1.2.0

BIN
go.sum

Binary file not shown.

View file

@ -128,7 +128,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
var deletePrm shard.DeletePrm var deletePrm shard.DeletePrm
deletePrm.SetAddresses(addr) deletePrm.SetAddresses(addr)
_, err = shards[i].Delete(deletePrm) _, err = shards[i].Delete(ctx, deletePrm)
if err != nil { if err != nil {
return err return err
} }

View file

@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err return err == nil, err
} }
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface. // TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error { func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID) index, lst, err := e.getTreeShard(cid, treeID)

View file

@ -0,0 +1,57 @@
package meta
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// GetChildren returns parent -> children map.
// If an object has no children, then map will contain addr -> empty slice value.
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make(map[oid.Address][]oid.Address, len(addresses))
buffer := make([]byte, bucketKeySize)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, addr := range addresses {
if _, found := result[addr]; found {
continue
}
result[addr] = []oid.Address{}
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer))
if bkt == nil {
continue
}
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer)))
if err != nil {
return err
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return err
}
var resultAddress oid.Address
resultAddress.SetContainer(addr.Container())
resultAddress.SetObject(id)
result[addr] = append(result[addr], resultAddress)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
}) })
} }
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, err
}
// TreeExists implements the Forest interface. // TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock() t.modeMtx.RLock()

View file

@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
return res, nil return res, nil
} }
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
tree, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return tree.operations[len(tree.operations)-1].Time, nil
}
// TreeExists implements the pilorama.Forest interface. // TreeExists implements the pilorama.Forest interface.
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) { func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
fullID := cid.EncodeToString() + "/" + treeID fullID := cid.EncodeToString() + "/" + treeID

View file

@ -527,9 +527,18 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
checkExists(t, false, cid, treeID) checkExists(t, false, cid, treeID)
}) })
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false)) require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID) checkExists(t, true, cid, treeID)
height, err := s.TreeHeight(cid, treeID)
require.NoError(t, err)
require.EqualValues(t, 11, height)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
_, err = s.TreeHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
checkExists(t, false, cid, "another tree") // same CID, different tree checkExists(t, false, cid, "another tree") // same CID, different tree
t.Run("can be removed", func(t *testing.T) { t.Run("can be removed", func(t *testing.T) {

View file

@ -48,6 +48,8 @@ type Forest interface {
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes. // TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height.
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
} }
type ForestStorage interface { type ForestStorage interface {

View file

@ -296,8 +296,8 @@ func (s *Shard) Reload(opts ...Option) error {
opts[i](&c) opts[i](&c)
} }
s.m.Lock() unlock := s.lockExclusive()
defer s.m.Unlock() defer unlock()
ok, err := s.metaBase.Reload(c.metaOpts...) ok, err := s.metaBase.Reload(c.metaOpts...)
if err != nil { if err != nil {
@ -327,3 +327,15 @@ func (s *Shard) Reload(opts ...Option) error {
s.log.Info("trying to restore read-write mode") s.log.Info("trying to restore read-write mode")
return s.setMode(mode.ReadWrite) return s.setMode(mode.ReadWrite)
} }
func (s *Shard) lockExclusive() func() {
s.setModeRequested.Store(true)
val := s.gcCancel.Load()
if val != nil {
cancelGC := val.(context.CancelFunc)
cancelGC()
}
s.m.Lock()
s.setModeRequested.Store(false)
return s.m.Unlock
}

View file

@ -1,6 +1,7 @@
package shard package shard
import ( import (
"context"
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -27,84 +28,88 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
// Delete removes data from the shard's writeCache, metaBase and // Delete removes data from the shard's writeCache, metaBase and
// blobStor. // blobStor.
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
return s.delete(prm) return s.delete(ctx, prm)
} }
func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) { func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
if s.info.Mode.ReadOnly() { if s.info.Mode.ReadOnly() {
return DeleteRes{}, ErrReadOnlyMode return DeleteRes{}, ErrReadOnlyMode
} else if s.info.Mode.NoMetabase() { } else if s.info.Mode.NoMetabase() {
return DeleteRes{}, ErrDegradedMode return DeleteRes{}, ErrDegradedMode
} }
ln := len(prm.addr) for _, addr := range prm.addr {
select {
case <-ctx.Done():
return DeleteRes{}, ctx.Err()
default:
}
smalls := make(map[oid.Address][]byte, ln) s.deleteObjectFromWriteCacheSafe(addr)
for i := range prm.addr { s.deleteFromBlobstorSafe(addr)
if err := s.deleteFromMetabase(addr); err != nil {
return DeleteRes{}, err // stop on metabase error ?
}
}
return DeleteRes{}, nil
}
func (s *Shard) deleteObjectFromWriteCacheSafe(addr oid.Address) {
if s.hasWriteCache() { if s.hasWriteCache() {
err := s.writeCache.Delete(prm.addr[i]) err := s.writeCache.Delete(addr)
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) { if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
s.log.Warn("can't delete object from write cache", zap.String("error", err.Error())) s.log.Warn("can't delete object from write cache", zap.String("error", err.Error()))
} }
} }
}
func (s *Shard) deleteFromMetabase(addr oid.Address) error {
var delPrm meta.DeletePrm
delPrm.SetAddresses(addr)
res, err := s.metaBase.Delete(delPrm)
if err != nil {
return err
}
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 {
s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload))
}
s.addToPayloadSize(-int64(removedPayload))
return nil
}
func (s *Shard) deleteFromBlobstorSafe(addr oid.Address) {
var sPrm meta.StorageIDPrm var sPrm meta.StorageIDPrm
sPrm.SetAddress(prm.addr[i]) sPrm.SetAddress(addr)
res, err := s.metaBase.StorageID(sPrm) res, err := s.metaBase.StorageID(sPrm)
if err != nil { if err != nil {
s.log.Debug("can't get storage ID from metabase", s.log.Debug("can't get storage ID from metabase",
zap.Stringer("object", prm.addr[i]), zap.Stringer("object", addr),
zap.String("error", err.Error())) zap.String("error", err.Error()))
continue
} }
storageID := res.StorageID()
if res.StorageID() != nil {
smalls[prm.addr[i]] = res.StorageID()
}
}
var delPrm meta.DeletePrm
delPrm.SetAddresses(prm.addr...)
res, err := s.metaBase.Delete(delPrm)
if err != nil {
return DeleteRes{}, err // stop on metabase error ?
}
var totalRemovedPayload uint64
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
for i := range prm.addr {
removedPayload := res.RemovedPhysicalObjectSizes()[i]
totalRemovedPayload += removedPayload
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i]
if logicalRemovedPayload > 0 {
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
}
}
s.addToPayloadSize(-int64(totalRemovedPayload))
for i := range prm.addr {
var delPrm common.DeletePrm var delPrm common.DeletePrm
delPrm.Address = prm.addr[i] delPrm.Address = addr
id := smalls[prm.addr[i]] delPrm.StorageID = storageID
delPrm.StorageID = id
_, err = s.blobStor.Delete(delPrm) _, err = s.blobStor.Delete(delPrm)
if err != nil { if err != nil {
s.log.Debug("can't remove object from blobStor", s.log.Debug("can't remove object from blobStor",
zap.Stringer("object_address", prm.addr[i]), zap.Stringer("object_address", addr),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
}
return DeleteRes{}, nil
} }

View file

@ -49,7 +49,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = testGet(t, sh, getPrm, hasWriteCache) _, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err) require.NoError(t, err)
_, err = sh.Delete(delPrm) _, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err) require.NoError(t, err)
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
@ -73,7 +73,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err) require.NoError(t, err)
_, err = sh.Delete(delPrm) _, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err) require.NoError(t, err)
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)

View file

@ -57,6 +57,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
writecache.WithMaxObjectSize(wcBigObjectSize), writecache.WithMaxObjectSize(wcBigObjectSize),
writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
}, },
nil,
nil) nil)
} }
defer releaseShard(sh, t) defer releaseShard(sh, t)
@ -188,7 +189,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
require.Error(t, err) require.Error(t, err)
t.Run("skip errors", func(t *testing.T) { t.Run("skip errors", func(t *testing.T) {
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil) sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil, nil)
t.Cleanup(func() { require.NoError(t, sh.Close()) }) t.Cleanup(func() { require.NoError(t, sh.Close()) })
var restorePrm shard.RestorePrm var restorePrm shard.RestorePrm
@ -219,10 +220,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
} }
func TestStream(t *testing.T) { func TestStream(t *testing.T) {
sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil) sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil, nil)
defer releaseShard(sh1, t) defer releaseShard(sh1, t)
sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil) sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil, nil)
defer releaseShard(sh2, t) defer releaseShard(sh2, t)
const objCount = 5 const objCount = 5
@ -323,7 +324,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
writecache.WithSmallObjectSize(wcSmallObjectSize), writecache.WithSmallObjectSize(wcSmallObjectSize),
writecache.WithMaxObjectSize(wcBigObjectSize), writecache.WithMaxObjectSize(wcBigObjectSize),
} }
sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2)) sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2), nil)
objects := make([]*objectSDK.Object, objCount) objects := make([]*objectSDK.Object, objCount)
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {
@ -371,7 +372,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0)) require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0))
} }
sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3)) sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3), nil)
require.NoError(t, sh.SetMode(mode.ReadOnly)) require.NoError(t, sh.SetMode(mode.ReadOnly))
{ {

View file

@ -145,8 +145,8 @@ func (gc *gc) listenEvents(ctx context.Context) {
h := v.handlers[i] h := v.handlers[i]
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done()
h(runCtx, event) h(runCtx, event)
v.prevGroup.Done()
}) })
if err != nil { if err != nil {
gc.log.Warn("could not submit GC job to worker pool", gc.log.Warn("could not submit GC job to worker pool",
@ -196,6 +196,14 @@ func (gc *gc) stop() {
// with GC-marked graves. // with GC-marked graves.
// Does nothing if shard is in "read-only" mode. // Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage() { func (s *Shard) removeGarbage() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.gcCancel.Store(cancel)
if s.setModeRequested.Load() {
return
}
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
@ -207,6 +215,12 @@ func (s *Shard) removeGarbage() {
var iterPrm meta.GarbageIterationPrm var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error { iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address()) buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize { if len(buf) == s.rmBatchSize {
@ -233,7 +247,7 @@ func (s *Shard) removeGarbage() {
deletePrm.SetAddresses(buf...) deletePrm.SetAddresses(buf...)
// delete accumulated objects // delete accumulated objects
_, err = s.delete(deletePrm) _, err = s.delete(ctx, deletePrm)
if err != nil { if err != nil {
s.log.Warn("could not delete the objects", s.log.Warn("could not delete the objects",
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -313,6 +327,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return return
} }
expired, err := s.getExpiredWithLinked(expired)
if err != nil {
s.log.Warn("failed to get expired objects with linked", zap.Error(err))
return
}
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...) inhumePrm.SetAddresses(expired...)
@ -338,6 +358,20 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
} }
} }
func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(source)
if err != nil {
return nil, err
}
for parent, children := range parentToChildren {
result = append(result, parent)
result = append(result, children...)
}
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))

View file

@ -0,0 +1,144 @@
package shard
import (
"context"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
t.Parallel()
rootPath := t.TempDir()
var sh *Shard
l := &logger.Logger{Logger: zaptest.NewLogger(t)}
blobOpts := []blobstor.Option{
blobstor.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(1),
blobovniczatree.WithBlobovniczaShallowWidth(1)),
Policy: func(_ *object.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
}
opts := []Option{
WithID(NewIDFromBytes([]byte{})),
WithLogger(l),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
WithGCRemoverSleepInterval(1 * time.Second),
}
sh = New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() {
require.NoError(t, sh.Close())
})
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)
objID, _ := obj.ID()
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(objID)
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(putPrm)
require.NoError(t, err)
var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err, "failed to get")
//inhume
var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(addr)
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err, "failed to inhume")
_, err = sh.Get(context.Background(), getPrm)
require.Error(t, err, "get returned error")
require.True(t, IsErrNotFound(err), "invalid error type")
//storageID
var metaStIDPrm meta.StorageIDPrm
metaStIDPrm.SetAddress(addr)
storageID, err := sh.metaBase.StorageID(metaStIDPrm)
require.NoError(t, err, "failed to get storage ID")
//check existance in blobstore
var bsExisted common.ExistsPrm
bsExisted.Address = addr
bsExisted.StorageID = storageID.StorageID()
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existance")
require.True(t, exRes.Exists, "invalid blobstore existance result")
//drop from blobstor
var bsDeletePrm common.DeletePrm
bsDeletePrm.Address = addr
bsDeletePrm.StorageID = storageID.StorageID()
_, err = sh.blobStor.Delete(bsDeletePrm)
require.NoError(t, err, "failed to delete from blobstore")
//check existance in blobstore
exRes, err = sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existance")
require.False(t, exRes.Exists, "invalid blobstore existance result")
//get should return object not found
_, err = sh.Get(context.Background(), getPrm)
require.Error(t, err, "get returned no error")
require.True(t, IsErrNotFound(err), "invalid error type")
}

View file

@ -2,77 +2,30 @@ package shard_test
import ( import (
"context" "context"
"path/filepath" "errors"
"testing" "testing"
"time" "time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
) )
func Test_GCDropsLockedExpiredObject(t *testing.T) { func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
var sh *shard.Shard t.Parallel()
epoch := &epochState{ epoch := &epochState{
Value: 100, Value: 100,
} }
rootPath := t.TempDir() sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(2),
blobovniczatree.WithBlobovniczaShallowWidth(2)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
sh = shard.New(opts...)
require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
releaseShard(sh, t) releaseShard(sh, t)
@ -120,3 +73,97 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
return shard.IsErrNotFound(err) return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted") }, 3*time.Second, 1*time.Second, "expired object must be deleted")
} }
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
t.Parallel()
epoch := &epochState{
Value: 100,
}
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
var objExpirationAttr objectSDK.Attribute
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
objExpirationAttr.SetValue("101")
var lockExpirationAttr objectSDK.Attribute
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
lockExpirationAttr.SetValue("103")
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
parent.SetAttributes(objExpirationAttr)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
linkID, _ := link.ID()
sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)})
t.Cleanup(func() {
releaseShard(sh, t)
})
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID()
var putPrm shard.PutPrm
for _, child := range children {
putPrm.SetObject(child)
_, err := sh.Put(putPrm)
require.NoError(t, err)
}
putPrm.SetObject(link)
_, err := sh.Put(putPrm)
require.NoError(t, err)
err = sh.Lock(cnr, lockID, append(childIDs, parentID, linkID))
require.NoError(t, err)
putPrm.SetObject(lock)
_, err = sh.Put(putPrm)
require.NoError(t, err)
var getPrm shard.GetPrm
getPrm.SetAddress(objectCore.AddressOf(parent))
_, err = sh.Get(context.Background(), getPrm)
var splitInfoError *objectSDK.SplitInfoError
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
}

View file

@ -168,7 +168,7 @@ func TestCounters(t *testing.T) {
deletedNumber := int(phy / 4) deletedNumber := int(phy / 4)
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
_, err := sh.Delete(prm) _, err := sh.Delete(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical]) require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical])

View file

@ -18,8 +18,8 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode")
// Returns any error encountered that did not allow // Returns any error encountered that did not allow
// setting shard mode. // setting shard mode.
func (s *Shard) SetMode(m mode.Mode) error { func (s *Shard) SetMode(m mode.Mode) error {
s.m.Lock() unlock := s.lockExclusive()
defer s.m.Unlock() defer unlock()
return s.setMode(m) return s.setMode(m)
} }

View file

@ -84,7 +84,8 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
Storage: fstree.New( Storage: fstree.New(
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))), fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
}, },
})}) })},
nil)
defer releaseShard(sh, t) defer releaseShard(sh, t)
for _, tc := range testCases { for _, tc := range testCases {

View file

@ -3,6 +3,7 @@ package shard
import ( import (
"context" "context"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -31,6 +32,9 @@ type Shard struct {
metaBase *meta.DB metaBase *meta.DB
tsSource TombstoneSource tsSource TombstoneSource
gcCancel atomic.Value
setModeRequested atomic.Bool
} }
// Option represents Shard's constructor option. // Option represents Shard's constructor option.
@ -209,12 +213,12 @@ func WithWriteCache(use bool) Option {
} }
// hasWriteCache returns bool if write cache exists on shards. // hasWriteCache returns bool if write cache exists on shards.
func (s Shard) hasWriteCache() bool { func (s *Shard) hasWriteCache() bool {
return s.cfg.useWriteCache return s.cfg.useWriteCache
} }
// needRefillMetabase returns true if metabase is needed to be refilled. // needRefillMetabase returns true if metabase is needed to be refilled.
func (s Shard) needRefillMetabase() bool { func (s *Shard) needRefillMetabase() bool {
return s.cfg.refillMetabase return s.cfg.refillMetabase
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
@ -12,8 +13,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
@ -29,11 +33,13 @@ func (s epochState) CurrentEpoch() uint64 {
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
return newCustomShard(t, t.TempDir(), enableWriteCache, return newCustomShard(t, t.TempDir(), enableWriteCache,
nil,
nil, nil,
nil) nil)
} }
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard { func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard {
var sh *shard.Shard
if enableWriteCache { if enableWriteCache {
rootPath = filepath.Join(rootPath, "wc") rootPath = filepath.Join(rootPath, "wc")
} else { } else {
@ -67,8 +73,9 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
shard.WithLogger(&logger.Logger{Logger: zap.L()}), shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(bsOpts...), shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions( shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")), append([]meta.Option{
meta.WithEpochState(epochState{}), meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})},
metaOptions...)...,
), ),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))), shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
shard.WithWriteCache(enableWriteCache), shard.WithWriteCache(enableWriteCache),
@ -77,9 +84,21 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
[]writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))}, []writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))},
wcOpts...)..., wcOpts...)...,
), ),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithGCRemoverSleepInterval(1 * time.Millisecond),
} }
sh := shard.New(opts...) sh = shard.New(opts...)
require.NoError(t, sh.Open()) require.NoError(t, sh.Open())
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))

View file

@ -37,7 +37,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
writecache.WithSmallObjectSize(smallSize), writecache.WithSmallObjectSize(smallSize),
writecache.WithMaxObjectSize(smallSize * 2)} writecache.WithMaxObjectSize(smallSize * 2)}
sh := newCustomShard(t, dir, true, wcOpts, nil) sh := newCustomShard(t, dir, true, wcOpts, nil, nil)
var putPrm shard.PutPrm var putPrm shard.PutPrm
@ -48,7 +48,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
} }
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
sh = newCustomShard(t, dir, true, wcOpts, nil) sh = newCustomShard(t, dir, true, wcOpts, nil, nil)
defer releaseShard(sh, t) defer releaseShard(sh, t)
var getPrm shard.GetPrm var getPrm shard.GetPrm

View file

@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
return s.pilorama.TreeList(cid) return s.pilorama.TreeList(cid)
} }
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(cid, treeID)
}
// TreeExists implements the pilorama.Forest interface. // TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
if s.pilorama == nil { if s.pilorama == nil {

View file

@ -32,11 +32,11 @@ const (
func (c *cache) runFlushLoop() { func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.flushWorker(i) go c.workerFlushSmall()
} }
c.wg.Add(1) c.wg.Add(1)
go c.flushBigObjects() go c.workerFlushBig()
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
@ -48,7 +48,7 @@ func (c *cache) runFlushLoop() {
for { for {
select { select {
case <-tt.C: case <-tt.C:
c.flushDB() c.flushSmallObjects()
tt.Reset(defaultFlushInterval) tt.Reset(defaultFlushInterval)
case <-c.closeCh: case <-c.closeCh:
return return
@ -57,7 +57,7 @@ func (c *cache) runFlushLoop() {
}() }()
} }
func (c *cache) flushDB() { func (c *cache) flushSmallObjects() {
var lastKey []byte var lastKey []byte
var m []objectInfo var m []objectInfo
for { for {
@ -70,7 +70,7 @@ func (c *cache) flushDB() {
m = m[:0] m = m[:0]
c.modeMtx.RLock() c.modeMtx.RLock()
if c.readOnly() || !c.initialized.Load() { if c.readOnly() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
@ -109,10 +109,6 @@ func (c *cache) flushDB() {
var count int var count int
for i := range m { for i := range m {
if c.flushed.Contains(m[i].addr) {
continue
}
obj := object.New() obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil { if err := obj.Unmarshal(m[i].data); err != nil {
continue continue
@ -140,7 +136,7 @@ func (c *cache) flushDB() {
} }
} }
func (c *cache) flushBigObjects() { func (c *cache) workerFlushBig() {
defer c.wg.Done() defer c.wg.Done()
tick := time.NewTicker(defaultFlushInterval * 10) tick := time.NewTicker(defaultFlushInterval * 10)
@ -151,9 +147,6 @@ func (c *cache) flushBigObjects() {
if c.readOnly() { if c.readOnly() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
break break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
} }
_ = c.flushFSTree(true) _ = c.flushFSTree(true)
@ -181,10 +174,6 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString() sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f() data, err := f()
if err != nil { if err != nil {
c.reportFlushError("can't read a file", sAddr, err) c.reportFlushError("can't read a file", sAddr, err)
@ -212,9 +201,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err return err
} }
// mark object as flushed c.deleteFromDisk([]string{sAddr})
c.flushed.Add(sAddr, false)
return nil return nil
} }
@ -222,8 +209,8 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err return err
} }
// flushWorker writes objects to the main storage. // workerFlushSmall writes small objects to the main storage.
func (c *cache) flushWorker(_ int) { func (c *cache) workerFlushSmall() {
defer c.wg.Done() defer c.wg.Done()
var obj *object.Object var obj *object.Object
@ -236,9 +223,12 @@ func (c *cache) flushWorker(_ int) {
} }
err := c.flushObject(obj, nil) err := c.flushObject(obj, nil)
if err == nil { if err != nil {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) // Error is handled in flushObject.
continue
} }
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
} }
} }
@ -294,10 +284,6 @@ func (c *cache) flush(ignoreErrors bool) error {
cs := b.Cursor() cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k) sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}
if err := addr.DecodeString(sa); err != nil { if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err) c.reportFlushError("can't decode object address from the DB", sa, err)
if ignoreErrors { if ignoreErrors {

View file

@ -5,7 +5,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -15,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test" checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(false)) require.NoError(t, wc.Flush(false))
for i := 0; i < 2; i++ { check(t, mb, bs, objects)
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
}) })
t.Run("flush on moving to degraded mode", func(t *testing.T) { t.Run("flush on moving to degraded mode", func(t *testing.T) {
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded)) require.NoError(t, wc.SetMode(mode.Degraded))
for i := 0; i < 2; i++ { check(t, mb, bs, objects)
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
}) })
t.Run("ignore errors", func(t *testing.T) { t.Run("ignore errors", func(t *testing.T) {
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
}) })
}) })
}) })
t.Run("on init", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// not found
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// ok
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
}
require.NoError(t, wc.Close())
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
for i := range objects {
var prm meta.PutPrm
prm.SetObject(objects[i].obj)
_, err := mb.Put(prm)
require.NoError(t, err)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
inhumePrm.SetTombstoneAddress(oidtest.Address())
_, err := mb.Inhume(inhumePrm)
require.NoError(t, err)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
_, err = mb.Delete(deletePrm)
require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i)
}
}
})
} }
func putObject(t *testing.T, c Cache, size int) objectPair { func putObject(t *testing.T, c Cache, size int) objectPair {
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc Cache) { func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init()) require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
} }
type dummyEpoch struct{} type dummyEpoch struct{}

View file

@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr)) value, err := Get(c.db, []byte(saddr))
if err == nil { if err == nil {
obj := objectSDK.New() obj := objectSDK.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value) return obj, obj.Unmarshal(value)
} }
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
} }
c.flushed.Get(saddr)
return res.Object, nil return res.Object, nil
} }

View file

@ -1,183 +0,0 @@
package writecache
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func (c *cache) initFlushMarks() {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate()
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate()
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(prm)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
}
}
}
return nil
}
_, _ = c.fsTree.Iterate(prm)
c.log.Info("finished updating FSTree flush marks")
}
func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database")
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
m = append(m, string(k))
}
return nil
})
var addr oid.Address
for i := range m {
if err := addr.DecodeString(m[i]); err != nil {
continue
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log,
zap.String("address", m[j]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info("finished updating flush marks")
}
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(existsPrm)
if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
return needRemove, needRemove
}
var prm meta.StorageIDPrm
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error { err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error { return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return prm.handler(data) return prm.handler(data)
}) })
}) })
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f() data, err := f()
if err != nil { if err != nil {
if prm.ignoreErrors { if prm.ignoreErrors {

View file

@ -36,19 +36,6 @@ func (c *cache) setMode(m mode.Mode) error {
} }
} }
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks()
}
}()
}
if c.db != nil { if c.db != nil {
if err = c.db.Close(); err != nil { if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err) return fmt.Errorf("can't close write-cache database: %w", err)

View file

@ -26,8 +26,6 @@ func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
if c.readOnly() { if c.readOnly() {
return common.PutRes{}, ErrReadOnly return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
} }
sz := uint64(len(prm.RawData)) sz := uint64(len(prm.RawData))
@ -67,7 +65,7 @@ func (c *cache) putSmall(obj objectInfo) error {
) )
c.objCounters.IncDB() c.objCounters.IncDB()
} }
return nil return err
} }
// putBig writes object to FSTree and pushes it to the flush workers queue. // putBig writes object to FSTree and pushes it to the flush workers queue.

View file

@ -11,8 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -20,19 +18,7 @@ import (
// store represents persistent storage with in-memory LRU cache // store represents persistent storage with in-memory LRU cache
// for flushed items on top of it. // for flushed items on top of it.
type store struct { type store struct {
maxFlushedMarksCount int
maxRemoveBatchSize int
// flushed contains addresses of objects that were already flushed to the main storage.
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
// frequently read ones.
// MUST NOT be used inside bolt db transaction because it's eviction handler
// removes untracked items from the database.
flushed simplelru.LRUCache[string, bool]
db *bbolt.DB db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
} }
const dbName = "small.bolt" const dbName = "small.bolt"
@ -71,35 +57,9 @@ func (c *cache) openStore(readOnly bool) error {
return fmt.Errorf("could not open FSTree: %w", err) return fmt.Errorf("could not open FSTree: %w", err)
} }
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil return nil
} }
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
} else {
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
func (c *cache) deleteFromDB(keys []string) []string { func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 { if len(keys) == 0 {
return keys return keys

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -52,9 +51,6 @@ type cache struct {
mtx sync.RWMutex mtx sync.RWMutex
mode mode.Mode mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating // compressFlags maps address of a big object to boolean value indicating
@ -97,7 +93,6 @@ func New(opts ...Option) Cache {
c := &cache{ c := &cache{
flushCh: make(chan *object.Object), flushCh: make(chan *object.Object),
mode: mode.ReadWrite, mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
compressFlags: make(map[string]struct{}), compressFlags: make(map[string]struct{}),
options: options{ options: options{
@ -116,12 +111,6 @@ func New(opts ...Option) Cache {
opts[i](&c.options) opts[i](&c.options)
} }
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
// Assume small and big objects are stored in 50-50 proportion.
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
return c return c
} }
@ -152,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services. // Init runs necessary services.
func (c *cache) Init() error { func (c *cache) Init() error {
c.initFlushMarks()
c.runFlushLoop() c.runFlushLoop()
return nil return nil
} }
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error { func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// Finish all in-progress operations.
if err := c.setMode(mode.ReadOnly); err != nil {
return err
}
if c.closeCh != nil { if c.closeCh != nil {
close(c.closeCh) close(c.closeCh)
} }
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait() c.wg.Wait()
if c.closeCh != nil {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.closeCh = nil c.closeCh = nil
}
c.initialized.Store(false)
var err error var err error
if c.db != nil { if c.db != nil {
err = c.db.Close() err = c.db.Close()

View file

@ -57,8 +57,6 @@ type Client struct {
acc *wallet.Account // neo account acc *wallet.Account // neo account
accAddr util.Uint160 // account's address accAddr util.Uint160 // account's address
signer *transaction.Signer
notary *notaryInfo notary *notaryInfo
cfg cfg cfg cfg
@ -70,9 +68,6 @@ type Client struct {
// on every normal call. // on every normal call.
switchLock *sync.RWMutex switchLock *sync.RWMutex
notifications chan rpcclient.Notification
subsInfo // protected with switchLock
// channel for internal stop // channel for internal stop
closeChan chan struct{} closeChan chan struct{}
@ -567,26 +562,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val
// NotificationChannel returns channel than receives subscribed // NotificationChannel returns channel than receives subscribed
// notification from the connected RPC node. // notification from the connected RPC node.
// Channel is closed when connection to the RPC node has been // Channel is closed when connection to the RPC node is lost.
// lost without the possibility of recovery.
func (c *Client) NotificationChannel() <-chan rpcclient.Notification { func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
return c.notifications c.switchLock.RLock()
} defer c.switchLock.RUnlock()
return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980
// inactiveMode switches Client to an inactive mode:
// - notification channel is closed;
// - all the new RPC request would return ErrConnectionLost;
// - inactiveModeCb is called if not nil.
func (c *Client) inactiveMode() {
c.switchLock.Lock()
defer c.switchLock.Unlock()
close(c.notifications)
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
} }
func (c *Client) setActor(act *actor.Actor) { func (c *Client) setActor(act *actor.Actor) {

View file

@ -9,11 +9,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -105,17 +102,8 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
logger: cfg.logger, logger: cfg.logger,
acc: acc, acc: acc,
accAddr: accAddr, accAddr: accAddr,
signer: cfg.signer,
cfg: *cfg, cfg: *cfg,
switchLock: &sync.RWMutex{}, switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification),
subsInfo: subsInfo{
blockRcv: make(chan *block.Block),
notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
},
closeChan: make(chan struct{}), closeChan: make(chan struct{}),
} }
@ -145,7 +133,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
} }
cli.setActor(act) cli.setActor(act)
go cli.notificationLoop(ctx) go cli.closeWaiter(ctx)
return cli, nil return cli, nil
} }

View file

@ -5,11 +5,6 @@ import (
"sort" "sort"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -33,7 +28,8 @@ func (e *endpoints) init(ee []Endpoint) {
e.list = ee e.list = ee
} }
func (c *Client) switchRPC(ctx context.Context) bool { // SwitchRPC performs reconnection and returns true if it was successful.
func (c *Client) SwitchRPC(ctx context.Context) bool {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
@ -57,20 +53,8 @@ func (c *Client) switchRPC(ctx context.Context) bool {
c.logger.Info("connection to the new RPC node has been established", c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint)) zap.String("endpoint", newEndpoint))
subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false)
if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -81,97 +65,21 @@ func (c *Client) switchRPC(ctx context.Context) bool {
return true return true
} }
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
return false return false
} }
func (c *Client) notificationLoop(ctx context.Context) { func (c *Client) closeWaiter(ctx context.Context) {
var e any
var ok bool
for {
c.switchLock.RLock()
bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select { select {
case <-ctx.Done(): case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan: case <-c.closeChan:
}
_ = c.UnsubscribeAll() _ = c.UnsubscribeAll()
c.close() c.close()
return
case e, ok = <-bChan:
case e, ok = <-nChan:
case e, ok = <-nrChan:
}
if ok {
c.routeEvent(ctx, e)
continue
}
if !c.reconnect(ctx) {
return
}
}
}
func (c *Client) routeEvent(ctx context.Context, e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
select {
case c.notifications <- typedNotification:
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect(ctx context.Context) bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method, that happens only when a client has
// switched to the more prioritized RPC
return true
}
if !c.switchRPC(ctx) {
c.logger.Error("could not establish connection to any RPC node")
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
return true
} }
func (c *Client) switchToMostPrioritized(ctx context.Context) { func (c *Client) switchToMostPrioritized(ctx context.Context) {
@ -217,7 +125,6 @@ mainLoop:
continue continue
} }
if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok {
c.switchLock.Lock() c.switchLock.Lock()
// higher priority node could have been // higher priority node could have been
@ -232,7 +139,6 @@ mainLoop:
c.cache.invalidate() c.cache.invalidate()
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.subsInfo = subs
c.endpoints.curr = i c.endpoints.curr = i
c.switchLock.Unlock() c.switchLock.Unlock()
@ -242,18 +148,13 @@ mainLoop:
return return
} }
c.logger.Warn("could not restore side chain subscriptions using node",
zap.String("endpoint", tryE),
zap.Error(err),
)
}
} }
} }
} }
// close closes notification channel and wrapped WS client. // close closes notification channel and wrapped WS client.
func (c *Client) close() { func (c *Client) close() {
close(c.notifications) c.switchLock.RLock()
defer c.switchLock.RUnlock()
c.client.Close() c.client.Close()
} }

View file

@ -208,8 +208,8 @@ func (c *Client) SetGroupSignerScope() error {
return err return err
} }
c.signer.Scopes = transaction.CustomGroups c.cfg.signer.Scopes = transaction.CustomGroups
c.signer.AllowedGroups = []*keys.PublicKey{pub} c.cfg.signer.AllowedGroups = []*keys.PublicKey{pub}
return nil return nil
} }

View file

@ -596,18 +596,18 @@ func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, comm
s = append(s, transaction.Signer{ s = append(s, transaction.Signer{
Account: hash.Hash160(multisigScript), Account: hash.Hash160(multisigScript),
Scopes: c.signer.Scopes, Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.signer.AllowedContracts, AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.signer.AllowedGroups, AllowedGroups: c.cfg.signer.AllowedGroups,
}) })
if !invokedByAlpha { if !invokedByAlpha {
// then we have invoker signature // then we have invoker signature
s = append(s, transaction.Signer{ s = append(s, transaction.Signer{
Account: hash.Hash160(c.acc.GetVerificationScript()), Account: hash.Hash160(c.acc.GetVerificationScript()),
Scopes: c.signer.Scopes, Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.signer.AllowedContracts, AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.signer.AllowedGroups, AllowedGroups: c.cfg.signer.AllowedGroups,
}) })
} }

View file

@ -1,15 +1,11 @@
package client package client
import ( import (
"context"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
) )
// Close closes connection to the remote side making // Close closes connection to the remote side making
@ -23,71 +19,46 @@ func (c *Client) Close() {
close(c.closeChan) close(c.closeChan)
} }
// SubscribeForExecutionNotifications adds subscription for notifications // ReceiveExecutionNotifications performs subscription for notifications
// generated during contract transaction execution to this instance of client. // generated during contract execution. Events are sent to the specified channel.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error { func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
if c.inactive { if c.inactive {
return ErrConnectionLost return "", ErrConnectionLost
} }
_, subscribed := c.subscribedEvents[contract] return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch)
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil {
return err
}
c.subscribedEvents[contract] = id
return nil
} }
// SubscribeForNewBlocks adds subscription for new block events to this // ReceiveBlocks performs subscription for new block events. Events are sent
// instance of client. // to the specified channel.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNewBlocks() error { func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
if c.inactive { if c.inactive {
return ErrConnectionLost return "", ErrConnectionLost
} }
if c.subscribedToBlocks { return c.client.ReceiveBlocks(nil, ch)
// no need to subscribe one more time
return nil
}
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil {
return err
}
c.subscribedToBlocks = true
return nil
} }
// SubscribeForNotaryRequests adds subscription for notary request payloads // ReceiveNotaryRequests performsn subscription for notary request payloads
// addition or removal events to this instance of client. Passed txSigner is // addition or removal events to this instance of client. Passed txSigner is
// used as filter: subscription is only for the notary requests that must be // used as filter: subscription is only for the notary requests that must be
// signed by txSigner. // signed by txSigner. Events are sent to the specified channel.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) {
if c.notary == nil { if c.notary == nil {
panic(notaryNotEnabledPanicMsg) panic(notaryNotEnabledPanicMsg)
} }
@ -96,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
if c.inactive { if c.inactive {
return ErrConnectionLost return "", ErrConnectionLost
} }
_, subscribed := c.subscribedNotaryEvents[txSigner] return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch)
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil {
return err
}
c.subscribedNotaryEvents[txSigner] = id
return nil
} }
// UnsubscribeContract removes subscription for given contract event stream. // Unsubscribe performs unsubscription for the given subscription ID.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeContract(contract util.Uint160) error { func (c *Client) Unsubscribe(subID string) error {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
@ -127,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error {
return ErrConnectionLost return ErrConnectionLost
} }
_, subscribed := c.subscribedEvents[contract] return c.client.Unsubscribe(subID)
if !subscribed {
// no need to unsubscribe contract
// without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedEvents[contract])
if err != nil {
return err
}
delete(c.subscribedEvents, contract)
return nil
}
// UnsubscribeNotaryRequest removes subscription for given notary requests
// signer.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[signer]
if !subscribed {
// no need to unsubscribe signer's
// requests without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer])
if err != nil {
return err
}
delete(c.subscribedNotaryEvents, signer)
return nil
} }
// UnsubscribeAll removes all active subscriptions of current client. // UnsubscribeAll removes all active subscriptions of current client.
@ -190,163 +100,10 @@ func (c *Client) UnsubscribeAll() error {
return ErrConnectionLost return ErrConnectionLost
} }
// no need to unsubscribe if there are
// no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
!c.subscribedToBlocks {
return nil
}
err := c.client.UnsubscribeAll() err := c.client.UnsubscribeAll()
if err != nil { if err != nil {
return err return err
} }
c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string)
c.subscribedToBlocks = false
return nil return nil
} }
// subsInfo includes channels for ws notifications;
// cached subscription information.
type subsInfo struct {
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var (
err error
id string
)
stopCh := make(chan struct{})
defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background)
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration
if si.subscribedToBlocks {
_, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil {
c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
}
// notification events restoration
for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedNotaryEvents[signer] = id
}
}
return si, true
}
func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block,
notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) {
// neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification
// while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions
go func() {
var e any
var ok bool
for {
select {
case <-stopCh:
return
case e, ok = <-blockRcv:
case e, ok = <-notificationRcv:
case e, ok = <-notaryReqRcv:
}
if !ok {
return
}
if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
}
c.routeEvent(ctx, e)
}
}()
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
}

View file

@ -10,8 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -35,16 +35,27 @@ type (
Close() Close()
} }
subChannels struct {
NotifyChan chan *state.ContainedNotificationEvent
BlockChan chan *block.Block
NotaryChan chan *result.NotaryRequestEvent
}
subscriber struct { subscriber struct {
*sync.RWMutex *sync.RWMutex
log *logger.Logger log *logger.Logger
client *client.Client client *client.Client
notifyChan chan *state.ContainedNotificationEvent notifyChan chan *state.ContainedNotificationEvent
blockChan chan *block.Block blockChan chan *block.Block
notaryChan chan *result.NotaryRequestEvent notaryChan chan *result.NotaryRequestEvent
current subChannels
// cached subscription information
subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool
} }
// Params is a group of Subscriber constructor parameters. // Params is a group of Subscriber constructor parameters.
@ -75,22 +86,28 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
notifyIDs := make(map[util.Uint160]struct{}, len(contracts)) notifyIDs := make([]string, 0, len(contracts))
for i := range contracts { for i := range contracts {
if s.subscribedEvents[contracts[i]] {
continue
}
// subscribe to contract notifications // subscribe to contract notifications
err := s.client.SubscribeForExecutionNotifications(contracts[i]) id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan)
if err != nil { if err != nil {
// if there is some error, undo all subscriptions and return error // if there is some error, undo all subscriptions and return error
for hash := range notifyIDs { for _, id := range notifyIDs {
_ = s.client.UnsubscribeContract(hash) _ = s.client.Unsubscribe(id)
} }
return err return err
} }
// save notification id // save notification id
notifyIDs[contracts[i]] = struct{}{} notifyIDs = append(notifyIDs, id)
}
for i := range contracts {
s.subscribedEvents[contracts[i]] = true
} }
return nil return nil
@ -109,82 +126,34 @@ func (s *subscriber) Close() {
} }
func (s *subscriber) BlockNotifications() error { func (s *subscriber) BlockNotifications() error {
if err := s.client.SubscribeForNewBlocks(); err != nil { s.Lock()
defer s.Unlock()
if s.subscribedToNewBlocks {
return nil
}
if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil {
return fmt.Errorf("could not subscribe for new block events: %w", err) return fmt.Errorf("could not subscribe for new block events: %w", err)
} }
s.subscribedToNewBlocks = true
return nil return nil
} }
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error { func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil { s.Lock()
defer s.Unlock()
if s.subscribedNotaryEvents[mainTXSigner] {
return nil
}
if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil {
return fmt.Errorf("could not subscribe for notary request events: %w", err) return fmt.Errorf("could not subscribe for notary request events: %w", err)
} }
s.subscribedNotaryEvents[mainTXSigner] = true
return nil return nil
} }
func (s *subscriber) routeNotifications(ctx context.Context) {
notificationChan := s.client.NotificationChannel()
for {
select {
case <-ctx.Done():
return
case notification, ok := <-notificationChan:
if !ok {
s.log.Warn("remote notification channel has been closed")
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
return
}
switch notification.Type {
case neorpc.NotificationEventID:
notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent)
if !ok {
s.log.Error("can't cast notify event value to the notify struct",
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.log.Debug("new notification event from sidechain",
zap.String("name", notifyEvent.Name),
)
s.notifyChan <- notifyEvent
case neorpc.BlockEventID:
b, ok := notification.Value.(*block.Block)
if !ok {
s.log.Error("can't cast block event value to block",
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.blockChan <- b
case neorpc.NotaryRequestEventID:
notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent)
if !ok {
s.log.Error("can't cast notify event value to the notary request struct",
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.notaryChan <- notaryRequest
default:
s.log.Debug("unsupported notification from the chain",
zap.Uint8("type", uint8(notification.Type)),
)
}
}
}
}
// New is a constructs Neo:Morph event listener and returns Subscriber interface. // New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) { func New(ctx context.Context, p *Params) (Subscriber, error) {
switch { switch {
@ -208,16 +177,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
notifyChan: make(chan *state.ContainedNotificationEvent), notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block), blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent), notaryChan: make(chan *result.NotaryRequestEvent),
}
// Worker listens all events from neo-go websocket and puts them current: newSubChannels(),
// into corresponding channel. It may be notifications, transactions,
// new blocks. For now only notifications. subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool),
}
// Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels.
go sub.routeNotifications(ctx) go sub.routeNotifications(ctx)
return sub, nil return sub, nil
} }
func (s *subscriber) routeNotifications(ctx context.Context) {
var (
// TODO: not needed after nspcc-dev/neo-go#2980.
cliCh = s.client.NotificationChannel()
restoreCh = make(chan bool)
restoreInProgress bool
)
routeloop:
for {
var connLost bool
s.RLock()
curr := s.current
s.RUnlock()
select {
case <-ctx.Done():
break routeloop
case ev, ok := <-curr.NotifyChan:
if ok {
s.notifyChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.BlockChan:
if ok {
s.blockChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.NotaryChan:
if ok {
s.notaryChan <- ev
} else {
connLost = true
}
case _, ok := <-cliCh:
connLost = !ok
case ok := <-restoreCh:
restoreInProgress = false
if !ok {
connLost = true
}
}
if connLost {
if !restoreInProgress {
restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh)
if !restoreInProgress {
break routeloop
}
curr.drain()
} else { // Avoid getting additional !ok events.
s.Lock()
s.current.NotifyChan = nil
s.current.BlockChan = nil
s.current.NotaryChan = nil
s.Unlock()
}
}
}
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
}
func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) {
s.log.Info("RPC connection lost, attempting reconnect")
if !s.client.SwitchRPC(ctx) {
s.log.Error("can't switch RPC node")
return false, nil
}
cliCh := s.client.NotificationChannel()
s.Lock()
chs := newSubChannels()
go func() {
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
}()
s.current = chs
s.Unlock()
return true, cliCh
}
func newSubChannels() subChannels {
return subChannels{
NotifyChan: make(chan *state.ContainedNotificationEvent),
BlockChan: make(chan *block.Block),
NotaryChan: make(chan *result.NotaryRequestEvent),
}
}
func (s *subChannels) drain() {
drainloop:
for {
select {
case _, ok := <-s.NotifyChan:
if !ok {
s.NotifyChan = nil
}
case _, ok := <-s.BlockChan:
if !ok {
s.BlockChan = nil
}
case _, ok := <-s.NotaryChan:
if !ok {
s.NotaryChan = nil
}
default:
break drainloop
}
}
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent,
blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool {
var err error
// new block events restoration
if s.subscribedToNewBlocks {
_, err = s.client.ReceiveBlocks(blCh)
if err != nil {
s.log.Error("could not restore block subscription after RPC switch", zap.Error(err))
return false
}
}
// notification events restoration
for contract := range s.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveExecutionNotifications(contract, notifCh)
if err != nil {
s.log.Error("could not restore notification subscription after RPC switch", zap.Error(err))
return false
}
}
// notary notification events restoration
for signer := range s.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveNotaryRequests(signer, notaryCh)
if err != nil {
s.log.Error("could not restore notary notification subscription after RPC switch", zap.Error(err))
return false
}
}
return true
}
// awaitHeight checks if remote client has least expected block height and // awaitHeight checks if remote client has least expected block height and
// returns error if it is not reached that height after timeout duration. // returns error if it is not reached that height after timeout duration.
// This function is required to avoid connections to unsynced RPC nodes, because // This function is required to avoid connections to unsynced RPC nodes, because

View file

@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p return p
} }
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
if p != nil {
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
}
return p
}
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil { if p != nil {
p.relay = f p.relay = f

View file

@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2), object.NewFromV2(oV2),
). ).
WithRelay(s.relayRequest). WithRelay(s.relayRequest).
WithCommonPrm(commonPrm), nil WithCommonPrm(commonPrm).
WithCopyNumber(part.GetCopiesNumber()), nil
} }
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm { func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -27,7 +27,7 @@ type cacheItem struct {
} }
const ( const (
defaultClientCacheSize = 10 defaultClientCacheSize = 32
defaultClientConnectTimeout = time.Second * 2 defaultClientConnectTimeout = time.Second * 2
defaultReconnectInterval = time.Second * 15 defaultReconnectInterval = time.Second * 15
) )
@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() { func (c *clientCache) init() {
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) { l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
_ = value.cc.Close() if conn := value.cc; conn != nil {
_ = conn.Close()
}
}) })
c.LRU = *l c.LRU = *l
} }

View file

@ -13,6 +13,7 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -31,6 +32,8 @@ type Service struct {
syncChan chan struct{} syncChan chan struct{}
syncPool *ants.Pool syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs. // cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{} cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap // cnrMapMtx protects cnrMap
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
} }
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
} }
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue // Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes. // for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
} }
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
} }
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
h := b.GetHeight() h := b.GetHeight()
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
if err != nil {
return err
}
for { for {
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h) lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
if err != nil || lm.Time == 0 { if err != nil || lm.Time == 0 || lastHeight < lm.Time {
return err return err
} }
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId()) err := cid.Decode(req.GetBody().GetContainerId())
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
} }
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil return new(HealthcheckResponse), nil
} }

View file

@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
rawCID := make([]byte, sha256.Size) rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID) cid.Encode(rawCID)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(1024)
var heightMtx sync.Mutex
for { for {
newHeight := height newHeight := height
req := &GetOpLogRequest{ req := &GetOpLogRequest{
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
}, },
} }
if err := SignMessage(req, s.key); err != nil { if err := SignMessage(req, s.key); err != nil {
_ = errG.Wait()
return newHeight, err return newHeight, err
} }
c, err := treeClient.GetOpLog(ctx, req) c, err := treeClient.GetOpLog(ctx, req)
if err != nil { if err != nil {
_ = errG.Wait()
return newHeight, fmt.Errorf("can't initialize client: %w", err) return newHeight, fmt.Errorf("can't initialize client: %w", err)
} }
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
Child: lm.ChildId, Child: lm.ChildId,
} }
if err := m.Meta.FromBytes(lm.Meta); err != nil { if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errG.Wait()
return newHeight, err return newHeight, err
} }
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil { errG.Go(func() error {
return newHeight, err err := s.forest.TreeApply(cid, treeID, m, true)
heightMtx.Lock()
defer heightMtx.Unlock()
if err != nil {
if newHeight > height {
height = newHeight
}
return err
} }
if m.Time > newHeight { if m.Time > newHeight {
newHeight = m.Time + 1 newHeight = m.Time + 1
} else { } else {
newHeight++ newHeight++
} }
return nil
})
} }
applyErr := errG.Wait()
if err == nil {
err = applyErr
}
heightMtx.Lock()
if height == newHeight || err != nil && !errors.Is(err, io.EOF) { if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
heightMtx.Unlock()
return newHeight, err return newHeight, err
} }
height = newHeight height = newHeight
heightMtx.Unlock()
} }
} }
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
cnrs, err := s.cfg.cnrSource.List() cnrs, err := s.cfg.cnrSource.List()
if err != nil { if err != nil {
s.log.Error("could not fetch containers", zap.Error(err)) s.log.Error("could not fetch containers", zap.Error(err))
continue break
} }
newMap, cnrsToSync := s.containersToSync(cnrs) newMap, cnrsToSync := s.containersToSync(cnrs)
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.log.Debug("trees have been synchronized") s.log.Debug("trees have been synchronized")
} }
s.initialSyncDone.Store(true)
} }
} }