diff --git a/.docker/Dockerfile.adm b/.docker/Dockerfile.adm index 09d66e642..b8f0ad1ca 100644 --- a/.docker/Dockerfile.adm +++ b/.docker/Dockerfile.adm @@ -1,4 +1,4 @@ -FROM golang:1.18 as builder +FROM golang:1.19 as builder ARG BUILD=now ARG VERSION=dev ARG REPO=repository diff --git a/.docker/Dockerfile.cli b/.docker/Dockerfile.cli index c706359b3..1e1c1c2db 100644 --- a/.docker/Dockerfile.cli +++ b/.docker/Dockerfile.cli @@ -1,4 +1,4 @@ -FROM golang:1.18 as builder +FROM golang:1.19 as builder ARG BUILD=now ARG VERSION=dev ARG REPO=repository diff --git a/.docker/Dockerfile.ir b/.docker/Dockerfile.ir index 9f8e72386..8ca7b1d50 100644 --- a/.docker/Dockerfile.ir +++ b/.docker/Dockerfile.ir @@ -1,4 +1,4 @@ -FROM golang:1.18 as builder +FROM golang:1.19 as builder ARG BUILD=now ARG VERSION=dev ARG REPO=repository diff --git a/.docker/Dockerfile.storage b/.docker/Dockerfile.storage index 39eb19559..6364d2e52 100644 --- a/.docker/Dockerfile.storage +++ b/.docker/Dockerfile.storage @@ -1,4 +1,4 @@ -FROM golang:1.18 as builder +FROM golang:1.19 as builder ARG BUILD=now ARG VERSION=dev ARG REPO=repository diff --git a/.docker/Dockerfile.storage-testnet b/.docker/Dockerfile.storage-testnet index 908ff0aad..8a266640c 100644 --- a/.docker/Dockerfile.storage-testnet +++ b/.docker/Dockerfile.storage-testnet @@ -1,4 +1,4 @@ -FROM golang:1.18 as builder +FROM golang:1.19 as builder ARG BUILD=now ARG VERSION=dev ARG REPO=repository diff --git a/CHANGELOG.md b/CHANGELOG.md index b4d6e7ca0..fa4bfd7ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,13 @@ Changelog for FrostFS Node ## [Unreleased] ### Added +- Support copies number parameter in `frostfs-cli object put` (#351) + ### Changed ### Fixed +- Copy number was not used for `PUT` requests (#284) +- Tree service panic in its internal client cache (#323) + ### Removed ### Updated ### Updating from v0.36.0 @@ -64,6 +69,7 @@ Changelog for FrostFS Node - Iterating over just removed files by FSTree (#98) - Parts of a locked object could not be removed anymore (#141) - Non-alphabet nodes do not try to handle alphabet events (#181) +- Delete complex objects with GC (#332) ### Removed ### Updated @@ -75,7 +81,7 @@ Changelog for FrostFS Node - `github.com/spf13/viper` to `v1.15.0` - `github.com/nats-io/nats.go` to `v1.22.1` - `github.com/TrueCloudLab/hrw` to `v.1.1.1` -- Minimum go version to v1.18 +- Minimum go version to v1.19 ### Updating from v0.35.0 (old NeoFS) @@ -84,7 +90,7 @@ You need to change configuration environment variables to `FROSTFS_*` if you use New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime more appropriate for a specific deployment. -Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__` +Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__` (existed objects with old attributes will be treated as before, but for new objects new attributes will be used). ## Older versions diff --git a/README.md b/README.md index 81701c441..f228cd426 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ The latest version of frostfs-node works with frostfs-contract # Building -To make all binaries you need Go 1.18+ and `make`: +To make all binaries you need Go 1.19+ and `make`: ``` make all ``` diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index cbf19eb4b..c892c9e7e 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -329,6 +329,8 @@ func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) { type PutObjectPrm struct { commonObjectPrm + copyNum uint32 + hdr *object.Object rdr io.Reader @@ -352,6 +354,11 @@ func (x *PutObjectPrm) SetHeaderCallback(f func(*object.Object)) { x.headerCallback = f } +// SetCopiesNumber sets number of object copies that is enough to consider put successful. +func (x *PutObjectPrm) SetCopiesNumber(copiesNumbers uint32) { + x.copyNum = copiesNumbers +} + // PutObjectRes groups the resulting values of PutObject operation. type PutObjectRes struct { id oid.ID @@ -381,6 +388,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) { } putPrm.WithXHeaders(prm.xHeaders...) + putPrm.SetCopiesNumber(prm.copyNum) wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm) if err != nil { diff --git a/cmd/frostfs-cli/modules/object/put.go b/cmd/frostfs-cli/modules/object/put.go index fe8e9dda9..e7f00a612 100644 --- a/cmd/frostfs-cli/modules/object/put.go +++ b/cmd/frostfs-cli/modules/object/put.go @@ -25,6 +25,7 @@ import ( const ( noProgressFlag = "no-progress" notificationFlag = "notify" + copiesNumberFlag = "copies-number" ) var putExpiredOn uint64 @@ -56,6 +57,8 @@ func initObjectPutCmd() { flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default") flags.Bool(binaryFlag, false, "Deserialize object structure from given file.") + + flags.Uint32(copiesNumberFlag, 0, "Number of copies of the object to store within the RPC call") } func putObject(cmd *cobra.Command, _ []string) { @@ -116,6 +119,12 @@ func putObject(cmd *cobra.Command, _ []string) { } } + copyNum, err := cmd.Flags().GetUint32(copiesNumberFlag) + commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err) + if copyNum > 0 { + prm.SetCopiesNumber(copyNum) + } + res, err := internalclient.PutObject(prm) if p != nil { p.Finish() diff --git a/cmd/frostfs-node/cache.go b/cmd/frostfs-node/cache.go index 3d4fc7375..dfbaf3525 100644 --- a/cmd/frostfs-node/cache.go +++ b/cmd/frostfs-node/cache.go @@ -24,6 +24,61 @@ type valueWithTime[V any] struct { e error } +type locker struct { + mtx *sync.Mutex + waiters int // not protected by mtx, must used outer mutex to update concurrently +} + +type keyLocker[K comparable] struct { + lockers map[K]*locker + lockersMtx *sync.Mutex +} + +func newKeyLocker[K comparable]() *keyLocker[K] { + return &keyLocker[K]{ + lockers: make(map[K]*locker), + lockersMtx: &sync.Mutex{}, + } +} + +func (l *keyLocker[K]) LockKey(key K) { + l.lockersMtx.Lock() + + if locker, found := l.lockers[key]; found { + locker.waiters++ + l.lockersMtx.Unlock() + + locker.mtx.Lock() + return + } + + locker := &locker{ + mtx: &sync.Mutex{}, + waiters: 1, + } + locker.mtx.Lock() + + l.lockers[key] = locker + l.lockersMtx.Unlock() +} + +func (l *keyLocker[K]) UnlockKey(key K) { + l.lockersMtx.Lock() + defer l.lockersMtx.Unlock() + + locker, found := l.lockers[key] + if !found { + return + } + + if locker.waiters == 1 { + delete(l.lockers, key) + } + locker.waiters-- + + locker.mtx.Unlock() +} + // entity that provides TTL cache interface. type ttlNetCache[K comparable, V any] struct { ttl time.Duration @@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct { cache *lru.Cache[K, *valueWithTime[V]] netRdr netValueReader[K, V] + + keyLocker *keyLocker[K] } // complicates netValueReader with TTL caching mechanism. @@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n fatalOnErr(err) return &ttlNetCache[K, V]{ - ttl: ttl, - sz: sz, - cache: cache, - netRdr: netRdr, + ttl: ttl, + sz: sz, + cache: cache, + netRdr: netRdr, + keyLocker: newKeyLocker[K](), } } @@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n // returned value should not be modified. func (c *ttlNetCache[K, V]) get(key K) (V, error) { val, ok := c.cache.Peek(key) - if ok { - if time.Since(val.t) < c.ttl { - return val.v, val.e - } + if ok && time.Since(val.t) < c.ttl { + return val.v, val.e + } - c.cache.Remove(key) + c.keyLocker.LockKey(key) + defer c.keyLocker.UnlockKey(key) + + val, ok = c.cache.Peek(key) + if ok && time.Since(val.t) < c.ttl { + return val.v, val.e } v, err := c.netRdr(key) - c.set(key, v, err) + c.cache.Add(key, &valueWithTime[V]{ + v: v, + t: time.Now(), + e: err, + }) return v, err } func (c *ttlNetCache[K, V]) set(k K, v V, e error) { + c.keyLocker.LockKey(k) + defer c.keyLocker.UnlockKey(k) + c.cache.Add(k, &valueWithTime[V]{ v: v, t: time.Now(), @@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) { } func (c *ttlNetCache[K, V]) remove(key K) { + c.keyLocker.LockKey(key) + defer c.keyLocker.UnlockKey(key) + c.cache.Remove(key) } diff --git a/cmd/frostfs-node/cache_test.go b/cmd/frostfs-node/cache_test.go new file mode 100644 index 000000000..a3e1c4ea6 --- /dev/null +++ b/cmd/frostfs-node/cache_test.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestKeyLocker(t *testing.T) { + taken := false + eg, _ := errgroup.WithContext(context.Background()) + keyLocker := newKeyLocker[int]() + for i := 0; i < 100; i++ { + eg.Go(func() error { + keyLocker.LockKey(0) + defer keyLocker.UnlockKey(0) + + require.False(t, taken) + taken = true + require.True(t, taken) + time.Sleep(10 * time.Millisecond) + taken = false + require.False(t, taken) + + return nil + }) + } + require.NoError(t, eg.Wait()) +} diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d81e47b17..0eece6f8a 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -864,22 +864,13 @@ func initLocalStorage(c *cfg) { ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) }) - // allocate memory for the service; + // allocate memory for the service to create tombstone source; // service will be created later c.cfgObject.getSvc = new(getsvc.Service) - var tssPrm tsourse.TombstoneSourcePrm - tssPrm.SetGetService(c.cfgObject.getSvc) - tombstoneSrc := tsourse.NewSource(tssPrm) - - tombstoneSource := tombstone.NewChecker( - tombstone.WithLogger(c.log), - tombstone.WithTombstoneSource(tombstoneSrc), - ) - var shardsAttached int for _, optsWithMeta := range c.shardOpts() { - id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) + id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) if err != nil { c.log.Error("failed to attach shard to engine", zap.Error(err)) } else { @@ -1080,7 +1071,7 @@ func (c *cfg) reloadConfig(ctx context.Context) { var rcfg engine.ReConfiguration for _, optsWithID := range c.shardOpts() { - rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) + rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))) } err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) @@ -1101,6 +1092,18 @@ func (c *cfg) reloadConfig(ctx context.Context) { c.log.Info("configuration has been reloaded successfully") } +func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { + var tssPrm tsourse.TombstoneSourcePrm + tssPrm.SetGetService(c.cfgObject.getSvc) + tombstoneSrc := tsourse.NewSource(tssPrm) + + tombstoneSource := tombstone.NewChecker( + tombstone.WithLogger(c.log), + tombstone.WithTombstoneSource(tombstoneSrc), + ) + return tombstoneSource +} + func (c *cfg) shutdown() { c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 6c864431d..316fc2721 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c // TODO: use owner directly from the event after neofs-contract#256 will become resolved // but don't forget about the profit of reading the new container and caching it: // creation success are most commonly tracked by polling GET op. - cnr, err := cachedContainerStorage.Get(ev.ID) + cnr, err := cnrSrc.Get(ev.ID) if err == nil { cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true) + cachedContainerStorage.set(ev.ID, cnr, nil) } else { // unlike removal, we expect successful receive of the container // after successful creation, so logging can be useful diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index d9b1c9208..62d7486f6 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -253,6 +253,10 @@ func readSubnetCfg(c *cfg) { // Must be called after initNetmapService. func bootstrapNode(c *cfg) { if c.needBootstrap() { + if c.IsMaintenance() { + c.log.Info("the node is under maintenance, skip initial bootstrap") + return + } err := c.bootstrap() fatalOnErrDetails("bootstrap error", err) } @@ -284,30 +288,74 @@ func initNetmapState(c *cfg) { epoch, err := c.cfgNetmap.wrapper.Epoch() fatalOnErrDetails("could not initialize current epoch number", err) - ni, err := c.netmapLocalNodeState(epoch) + ni, err := c.netmapInitLocalNodeState(epoch) fatalOnErrDetails("could not init network state", err) - stateWord := "undefined" - - if ni != nil { - switch { - case ni.IsOnline(): - stateWord = "online" - case ni.IsOffline(): - stateWord = "offline" - } - } + stateWord := nodeState(ni) c.log.Info("initial network state", zap.Uint64("epoch", epoch), zap.String("state", stateWord), ) + if ni != nil && ni.IsMaintenance() { + c.isMaintenance.Store(true) + } + c.cfgNetmap.state.setCurrentEpoch(epoch) c.cfgNetmap.startEpoch = epoch c.handleLocalNodeInfo(ni) } +func nodeState(ni *netmapSDK.NodeInfo) string { + if ni != nil { + switch { + case ni.IsOnline(): + return "online" + case ni.IsOffline(): + return "offline" + case ni.IsMaintenance(): + return "maintenance" + } + } + return "undefined" +} + +func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) { + nmNodes, err := c.cfgNetmap.wrapper.GetCandidates() + if err != nil { + return nil, err + } + + var candidate *netmapSDK.NodeInfo + for i := range nmNodes { + if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) { + candidate = &nmNodes[i] + break + } + } + + node, err := c.netmapLocalNodeState(epoch) + if err != nil { + return nil, err + } + + if candidate == nil { + return node, nil + } + + nmState := nodeState(node) + candidateState := nodeState(candidate) + if nmState != candidateState { + // This happens when the node was switched to maintenance without epoch tick. + // We expect it to continue staying in maintenance. + c.log.Info("candidate status is different from the netmap status, the former takes priority", + zap.String("netmap", nmState), + zap.String("candidate", candidateState)) + } + return candidate, nil +} + func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) { // calculate current network state nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch) diff --git a/config/example/node.env b/config/example/node.env index 9a1a8b052..a4088f75a 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -187,4 +187,4 @@ FROSTFS_STORAGE_SHARD_1_GC_REMOVER_SLEEP_INTERVAL=5m FROSTFS_TRACING_ENABLED=true FROSTFS_TRACING_ENDPOINT="localhost" -FROSTFS_TRACING_EXPORTER="otlp_grpc" \ No newline at end of file +FROSTFS_TRACING_EXPORTER="otlp_grpc" diff --git a/config/example/node.yaml b/config/example/node.yaml index e3b41d413..0d71f0fd2 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -219,4 +219,3 @@ tracing: enabled: true exporter: "otlp_grpc" endpoint: "localhost" - \ No newline at end of file diff --git a/go.mod b/go.mod index 301be6934..d825391d3 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module git.frostfs.info/TrueCloudLab/frostfs-node -go 1.18 +go 1.19 require ( - git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0 + git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142024-4cb0068ddef0 git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85 git.frostfs.info/TrueCloudLab/hrw v1.2.0 diff --git a/go.sum b/go.sum index f65a7aeab..ed5657fed 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0 h1:oZ0/KiaFeveXRLi5VVEpuLSHczeFyWx4HDl9wTJUtsE= -git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0/go.mod h1:sPyITTmQT662ZI38ud2aoE1SUCAr1mO5xV8P4nzLkKI= +git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142024-4cb0068ddef0 h1:2rgsz9KdOf2obbL/FRCZNUZja83kQZxPuF8IKBDl/bY= +git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142024-4cb0068ddef0/go.mod h1:sPyITTmQT662ZI38ud2aoE1SUCAr1mO5xV8P4nzLkKI= git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb h1:S/TrbOOu9qEXZRZ9/Ddw7crnxbBUQLo68PSzQWYrc9M= git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb/go.mod h1:nkR5gaGeez3Zv2SE7aceP0YwxG2FzIB5cGKpQO2vV2o= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk= diff --git a/pkg/local_object_storage/engine/dump.go b/pkg/local_object_storage/engine/dump.go index f5cf8c32e..6e33abd68 100644 --- a/pkg/local_object_storage/engine/dump.go +++ b/pkg/local_object_storage/engine/dump.go @@ -4,7 +4,7 @@ import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shar // DumpShard dumps objects from the shard with provided identifier. // -// Returns an error if shard is not read-only. +// Returns an error if shard is not read-only or disabled. func (e *StorageEngine) DumpShard(id *shard.ID, prm shard.DumpPrm) error { e.mtx.RLock() defer e.mtx.RUnlock() diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go index c50c0844c..25872333d 100644 --- a/pkg/local_object_storage/engine/remove_copies.go +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -128,7 +128,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address var deletePrm shard.DeletePrm deletePrm.SetAddresses(addr) - _, err = shards[i].Delete(deletePrm) + _, err = shards[i].Delete(ctx, deletePrm) if err != nil { return err } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 2b1146ff2..0ca89117f 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -174,7 +174,14 @@ func (e *StorageEngine) removeShards(ids ...string) { e.mtx.Unlock() for _, sh := range ss { - err := sh.Close() + err := sh.SetMode(mode.Disabled) + if err != nil { + e.log.Error("could not change shard mode to disabled", + zap.Stringer("id", sh.ID()), + zap.Error(err), + ) + } + err = sh.Close() if err != nil { e.log.Error("could not close removed shard", zap.Stringer("id", sh.ID()), diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index b69ab4890..13cfc4552 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { return err == nil, err } +func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) { + index, lst, err := e.getTreeShard(cid, treeID) + if err != nil { + return 0, nil + } + return lst[index].TreeHeight(cid, treeID) +} + // TreeUpdateLastSyncHeight implements the pilorama.Forest interface. func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error { index, lst, err := e.getTreeShard(cid, treeID) diff --git a/pkg/local_object_storage/metabase/children.go b/pkg/local_object_storage/metabase/children.go new file mode 100644 index 000000000..f0591b43c --- /dev/null +++ b/pkg/local_object_storage/metabase/children.go @@ -0,0 +1,57 @@ +package meta + +import ( + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +// GetChildren returns parent -> children map. +// If an object has no children, then map will contain addr -> empty slice value. +func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + result := make(map[oid.Address][]oid.Address, len(addresses)) + + buffer := make([]byte, bucketKeySize) + err := db.boltDB.View(func(tx *bbolt.Tx) error { + for _, addr := range addresses { + if _, found := result[addr]; found { + continue + } + + result[addr] = []oid.Address{} + bkt := tx.Bucket(parentBucketName(addr.Container(), buffer)) + if bkt == nil { + continue + } + + binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer))) + if err != nil { + return err + } + + for _, binObjID := range binObjIDs { + var id oid.ID + if err = id.Decode(binObjID); err != nil { + return err + } + var resultAddress oid.Address + resultAddress.SetContainer(addr.Container()) + resultAddress.SetObject(id) + result[addr] = append(result[addr], resultAddress) + } + } + return nil + }) + + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 3065c8370..5722c68aa 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -50,10 +50,23 @@ func (b *batch) run() { return b.operations[i].Time < b.operations[j].Time }) + b.operations = removeDuplicatesInPlace(b.operations) var lm Move return b.forest.applyOperation(bLog, bTree, b.operations, &lm) }) - for i := range b.operations { + for i := range b.results { b.results[i] <- err } } + +func removeDuplicatesInPlace(a []*Move) []*Move { + equalCount := 0 + for i := 1; i < len(a); i++ { + if a[i].Time == a[i-1].Time { + equalCount++ + } else { + a[i-equalCount] = a[i] + } + } + return a[:len(a)-equalCount] +} diff --git a/pkg/local_object_storage/pilorama/batch_test.go b/pkg/local_object_storage/pilorama/batch_test.go new file mode 100644 index 000000000..931fce18c --- /dev/null +++ b/pkg/local_object_storage/pilorama/batch_test.go @@ -0,0 +1,70 @@ +package pilorama + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_removeDuplicatesInPlace(t *testing.T) { + testCases := []struct { + before []int + after []int + }{ + { + before: []int{}, + after: []int{}, + }, + { + before: []int{1}, + after: []int{1}, + }, + { + before: []int{1, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 3}, + after: []int{1, 2, 3}, + }, + { + before: []int{1, 1, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 2, 3}, + after: []int{1, 2, 3}, + }, + { + before: []int{1, 1, 1}, + after: []int{1}, + }, + { + before: []int{1, 1, 2, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 1, 1, 2, 3, 3, 3}, + after: []int{1, 2, 3}, + }, + } + + for _, tc := range testCases { + ops := make([]*Move, len(tc.before)) + for i := range ops { + ops[i] = &Move{Meta: Meta{Time: Timestamp(tc.before[i])}} + } + + expected := make([]*Move, len(tc.after)) + for i := range expected { + expected[i] = &Move{Meta: Meta{Time: Timestamp(tc.after[i])}} + } + + actual := removeDuplicatesInPlace(ops) + require.Equal(t, expected, actual, "%d", tc.before) + } +} diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 994c3d416..8f1574b5f 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e }) } +func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) { + t.modeMtx.RLock() + defer t.modeMtx.RUnlock() + + if t.mode.NoMetabase() { + return 0, ErrDegradedMode + } + + var height uint64 + var retErr error + err := t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(cid, treeID)) + if treeRoot != nil { + k, _ := treeRoot.Bucket(logBucket).Cursor().Last() + height = binary.BigEndian.Uint64(k) + } else { + retErr = ErrTreeNotFound + } + return nil + }) + if err == nil { + err = retErr + } + return height, err +} + // TreeExists implements the Forest interface. func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { t.modeMtx.RLock() diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index fa2f1dcd2..ac9b9d55c 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) { return res, nil } +func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) { + fullID := cid.EncodeToString() + "/" + treeID + tree, ok := f.treeMap[fullID] + if !ok { + return 0, ErrTreeNotFound + } + return tree.operations[len(tree.operations)-1].Time, nil +} + // TreeExists implements the pilorama.Forest interface. func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) { fullID := cid.EncodeToString() + "/" + treeID diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index bbd35246c..d56a3f543 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -1,6 +1,7 @@ package pilorama import ( + "context" "fmt" "math/rand" "os" @@ -13,6 +14,7 @@ import ( cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) var providers = []struct { @@ -445,6 +447,82 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio }) } +func TestForest_ApplySameOperation(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + parallel := providers[i].name != "inmemory" + testForestApplySameOperation(t, providers[i].construct, parallel) + }) + } +} + +func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, parallel bool) { + cid := cidtest.ID() + treeID := "version" + + batchSize := 3 + errG, _ := errgroup.WithContext(context.Background()) + if !parallel { + batchSize = 1 + errG.SetLimit(1) + } + + meta := []Meta{ + {Time: 1, Items: []KeyValue{{AttributeFilename, []byte("1")}, {"attr", []byte{1}}}}, + {Time: 2, Items: []KeyValue{{AttributeFilename, []byte("2")}, {"attr", []byte{1}}}}, + {Time: 3, Items: []KeyValue{{AttributeFilename, []byte("3")}, {"attr", []byte{1}}}}, + } + logs := []Move{ + { + Child: 1, + Parent: RootID, + Meta: meta[0], + }, + { + Child: 2, + Parent: 1, + Meta: meta[1], + }, + { + Child: 1, + Parent: 2, + Meta: meta[2], + }, + } + + check := func(t *testing.T, s Forest) { + testMeta(t, s, cid, treeID, 1, RootID, meta[0]) + testMeta(t, s, cid, treeID, 2, 1, meta[1]) + + nodes, err := s.TreeGetChildren(cid, treeID, RootID) + require.NoError(t, err) + require.Equal(t, []Node{1}, nodes) + + nodes, err = s.TreeGetChildren(cid, treeID, 1) + require.NoError(t, err) + require.Equal(t, []Node{2}, nodes) + } + + t.Run("expected", func(t *testing.T) { + s := constructor(t) + for i := range logs { + require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false)) + } + check(t, s) + }) + + s := constructor(t, WithMaxBatchSize(batchSize)) + require.NoError(t, s.TreeApply(cid, treeID, &logs[0], false)) + for i := 0; i < batchSize; i++ { + errG.Go(func() error { + return s.TreeApply(cid, treeID, &logs[2], false) + }) + } + require.NoError(t, errG.Wait()) + require.NoError(t, s.TreeApply(cid, treeID, &logs[1], false)) + check(t, s) +} + func TestForest_GetOpLog(t *testing.T) { for i := range providers { t.Run(providers[i].name, func(t *testing.T) { @@ -527,10 +605,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O checkExists(t, false, cid, treeID) }) - require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false)) + require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false)) checkExists(t, true, cid, treeID) + + height, err := s.TreeHeight(cid, treeID) + require.NoError(t, err) + require.EqualValues(t, 11, height) + checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree - checkExists(t, false, cid, "another tree") // same CID, different tree + + _, err = s.TreeHeight(cidtest.ID(), treeID) + require.ErrorIs(t, err, ErrTreeNotFound) + + checkExists(t, false, cid, "another tree") // same CID, different tree t.Run("can be removed", func(t *testing.T) { require.NoError(t, s.TreeDrop(cid, treeID)) diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 290f633a5..c69682806 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -48,6 +48,8 @@ type Forest interface { TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error // TreeLastSyncHeight returns last log height synchronized with _all_ container nodes. TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) + // TreeHeight returns current tree height. + TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) } type ForestStorage interface { diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index d727d27a5..34617e1ee 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -296,8 +296,8 @@ func (s *Shard) Reload(opts ...Option) error { opts[i](&c) } - s.m.Lock() - defer s.m.Unlock() + unlock := s.lockExclusive() + defer unlock() ok, err := s.metaBase.Reload(c.metaOpts...) if err != nil { @@ -327,3 +327,15 @@ func (s *Shard) Reload(opts ...Option) error { s.log.Info("trying to restore read-write mode") return s.setMode(mode.ReadWrite) } + +func (s *Shard) lockExclusive() func() { + s.setModeRequested.Store(true) + val := s.gcCancel.Load() + if val != nil { + cancelGC := val.(context.CancelFunc) + cancelGC() + } + s.m.Lock() + s.setModeRequested.Store(false) + return s.m.Unlock +} diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 6ae3bf7dd..3fc7b627e 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -1,6 +1,7 @@ package shard import ( + "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -27,84 +28,88 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) { // Delete removes data from the shard's writeCache, metaBase and // blobStor. -func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { +func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { s.m.RLock() defer s.m.RUnlock() - return s.delete(prm) + return s.delete(ctx, prm) } -func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) { +func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { if s.info.Mode.ReadOnly() { return DeleteRes{}, ErrReadOnlyMode } else if s.info.Mode.NoMetabase() { return DeleteRes{}, ErrDegradedMode } - ln := len(prm.addr) - - smalls := make(map[oid.Address][]byte, ln) - - for i := range prm.addr { - if s.hasWriteCache() { - err := s.writeCache.Delete(prm.addr[i]) - if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) { - s.log.Warn("can't delete object from write cache", zap.String("error", err.Error())) - } + for _, addr := range prm.addr { + select { + case <-ctx.Done(): + return DeleteRes{}, ctx.Err() + default: } - var sPrm meta.StorageIDPrm - sPrm.SetAddress(prm.addr[i]) + s.deleteObjectFromWriteCacheSafe(addr) - res, err := s.metaBase.StorageID(sPrm) - if err != nil { - s.log.Debug("can't get storage ID from metabase", - zap.Stringer("object", prm.addr[i]), - zap.String("error", err.Error())) + s.deleteFromBlobstorSafe(addr) - continue - } - - if res.StorageID() != nil { - smalls[prm.addr[i]] = res.StorageID() + if err := s.deleteFromMetabase(addr); err != nil { + return DeleteRes{}, err // stop on metabase error ? } } + return DeleteRes{}, nil +} +func (s *Shard) deleteObjectFromWriteCacheSafe(addr oid.Address) { + if s.hasWriteCache() { + err := s.writeCache.Delete(addr) + if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) { + s.log.Warn("can't delete object from write cache", zap.String("error", err.Error())) + } + } +} + +func (s *Shard) deleteFromMetabase(addr oid.Address) error { var delPrm meta.DeletePrm - delPrm.SetAddresses(prm.addr...) + delPrm.SetAddresses(addr) res, err := s.metaBase.Delete(delPrm) if err != nil { - return DeleteRes{}, err // stop on metabase error ? + return err } - var totalRemovedPayload uint64 - s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) - for i := range prm.addr { - removedPayload := res.RemovedPhysicalObjectSizes()[i] - totalRemovedPayload += removedPayload - logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i] - if logicalRemovedPayload > 0 { - s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload)) - } + removedPayload := res.RemovedPhysicalObjectSizes()[0] + logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] + if logicalRemovedPayload > 0 { + s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload)) } - s.addToPayloadSize(-int64(totalRemovedPayload)) + s.addToPayloadSize(-int64(removedPayload)) - for i := range prm.addr { - var delPrm common.DeletePrm - delPrm.Address = prm.addr[i] - id := smalls[prm.addr[i]] - delPrm.StorageID = id - - _, err = s.blobStor.Delete(delPrm) - if err != nil { - s.log.Debug("can't remove object from blobStor", - zap.Stringer("object_address", prm.addr[i]), - zap.String("error", err.Error())) - } - } - - return DeleteRes{}, nil + return nil +} + +func (s *Shard) deleteFromBlobstorSafe(addr oid.Address) { + var sPrm meta.StorageIDPrm + sPrm.SetAddress(addr) + + res, err := s.metaBase.StorageID(sPrm) + if err != nil { + s.log.Debug("can't get storage ID from metabase", + zap.Stringer("object", addr), + zap.String("error", err.Error())) + } + storageID := res.StorageID() + + var delPrm common.DeletePrm + delPrm.Address = addr + delPrm.StorageID = storageID + + _, err = s.blobStor.Delete(delPrm) + if err != nil { + s.log.Debug("can't remove object from blobStor", + zap.Stringer("object_address", addr), + zap.String("error", err.Error())) + } } diff --git a/pkg/local_object_storage/shard/delete_test.go b/pkg/local_object_storage/shard/delete_test.go index c37dfa285..7ecfb59aa 100644 --- a/pkg/local_object_storage/shard/delete_test.go +++ b/pkg/local_object_storage/shard/delete_test.go @@ -49,7 +49,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) { _, err = testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) - _, err = sh.Delete(delPrm) + _, err = sh.Delete(context.Background(), delPrm) require.NoError(t, err) _, err = sh.Get(context.Background(), getPrm) @@ -73,7 +73,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) { _, err = sh.Get(context.Background(), getPrm) require.NoError(t, err) - _, err = sh.Delete(delPrm) + _, err = sh.Delete(context.Background(), delPrm) require.NoError(t, err) _, err = sh.Get(context.Background(), getPrm) diff --git a/pkg/local_object_storage/shard/dump.go b/pkg/local_object_storage/shard/dump.go index 8d9fe0f71..392fbc5f4 100644 --- a/pkg/local_object_storage/shard/dump.go +++ b/pkg/local_object_storage/shard/dump.go @@ -55,7 +55,9 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) { s.m.RLock() defer s.m.RUnlock() - if !s.info.Mode.ReadOnly() { + if s.info.Mode.Disabled() { + return DumpRes{}, ErrShardDisabled + } else if !s.info.Mode.ReadOnly() { return DumpRes{}, ErrMustBeReadOnly } diff --git a/pkg/local_object_storage/shard/dump_test.go b/pkg/local_object_storage/shard/dump_test.go index 9d585cc06..93501885d 100644 --- a/pkg/local_object_storage/shard/dump_test.go +++ b/pkg/local_object_storage/shard/dump_test.go @@ -57,6 +57,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) { writecache.WithMaxObjectSize(wcBigObjectSize), writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), }, + nil, nil) } defer releaseShard(sh, t) @@ -188,7 +189,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) { require.Error(t, err) t.Run("skip errors", func(t *testing.T) { - sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil) + sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil, nil) t.Cleanup(func() { require.NoError(t, sh.Close()) }) var restorePrm shard.RestorePrm @@ -219,10 +220,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) { } func TestStream(t *testing.T) { - sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil) + sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil, nil) defer releaseShard(sh1, t) - sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil) + sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil, nil) defer releaseShard(sh2, t) const objCount = 5 @@ -323,7 +324,7 @@ func TestDumpIgnoreErrors(t *testing.T) { writecache.WithSmallObjectSize(wcSmallObjectSize), writecache.WithMaxObjectSize(wcBigObjectSize), } - sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2)) + sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2), nil) objects := make([]*objectSDK.Object, objCount) for i := 0; i < objCount; i++ { @@ -371,7 +372,7 @@ func TestDumpIgnoreErrors(t *testing.T) { require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0)) } - sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3)) + sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3), nil) require.NoError(t, sh.SetMode(mode.ReadOnly)) { diff --git a/pkg/local_object_storage/shard/errors.go b/pkg/local_object_storage/shard/errors.go index 3e5224eb9..2958a492c 100644 --- a/pkg/local_object_storage/shard/errors.go +++ b/pkg/local_object_storage/shard/errors.go @@ -4,9 +4,12 @@ import ( "errors" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" ) +var ErrShardDisabled = logicerr.New("shard disabled") + // IsErrNotFound checks if error returned by Shard Get/Head/GetRange method // corresponds to missing object. func IsErrNotFound(err error) bool { diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 76e4347d4..527aff565 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -35,6 +35,7 @@ func (p ExistsRes) Exists() bool { // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. +// Returns the ErrShardDisabled if the shard is disabled. func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { var exists bool var err error @@ -42,7 +43,9 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { s.m.RLock() defer s.m.RUnlock() - if s.info.Mode.NoMetabase() { + if s.info.Mode.Disabled() { + return ExistsRes{}, ErrShardDisabled + } else if s.info.Mode.NoMetabase() { var p common.ExistsPrm p.Address = prm.addr diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 6f18e6c3a..8370d8fb8 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -145,8 +145,8 @@ func (gc *gc) listenEvents(ctx context.Context) { h := v.handlers[i] err := gc.workerPool.Submit(func() { + defer v.prevGroup.Done() h(runCtx, event) - v.prevGroup.Done() }) if err != nil { gc.log.Warn("could not submit GC job to worker pool", @@ -196,6 +196,14 @@ func (gc *gc) stop() { // with GC-marked graves. // Does nothing if shard is in "read-only" mode. func (s *Shard) removeGarbage() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.gcCancel.Store(cancel) + if s.setModeRequested.Load() { + return + } + s.m.RLock() defer s.m.RUnlock() @@ -207,6 +215,12 @@ func (s *Shard) removeGarbage() { var iterPrm meta.GarbageIterationPrm iterPrm.SetHandler(func(g meta.GarbageObject) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + buf = append(buf, g.Address()) if len(buf) == s.rmBatchSize { @@ -233,7 +247,7 @@ func (s *Shard) removeGarbage() { deletePrm.SetAddresses(buf...) // delete accumulated objects - _, err = s.delete(deletePrm) + _, err = s.delete(ctx, deletePrm) if err != nil { s.log.Warn("could not delete the objects", zap.String("error", err.Error()), @@ -313,6 +327,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) return } + expired, err := s.getExpiredWithLinked(expired) + if err != nil { + s.log.Warn("failed to get expired objects with linked", zap.Error(err)) + return + } + var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(expired...) @@ -338,6 +358,20 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) } } +func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error) { + result := make([]oid.Address, 0, len(source)) + parentToChildren, err := s.metaBase.GetChildren(source) + if err != nil { + return nil, err + } + for parent, children := range parentToChildren { + result = append(result, parent) + result = append(result, children...) + } + + return result, nil +} + func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { epoch := e.(newEpoch).epoch log := s.log.With(zap.Uint64("epoch", epoch)) diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go new file mode 100644 index 000000000..064572109 --- /dev/null +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -0,0 +1,144 @@ +package shard + +import ( + "context" + "path/filepath" + "testing" + "time" + + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { + t.Parallel() + + rootPath := t.TempDir() + + var sh *Shard + + l := &logger.Logger{Logger: zaptest.NewLogger(t)} + blobOpts := []blobstor.Option{ + blobstor.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + blobstor.WithStorages([]blobstor.SubStorage{ + { + Storage: blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")), + blobovniczatree.WithBlobovniczaShallowDepth(1), + blobovniczatree.WithBlobovniczaShallowWidth(1)), + Policy: func(_ *object.Object, data []byte) bool { + return len(data) <= 1<<20 + }, + }, + { + Storage: fstree.New( + fstree.WithPath(filepath.Join(rootPath, "blob"))), + }, + }), + } + + opts := []Option{ + WithID(NewIDFromBytes([]byte{})), + WithLogger(l), + WithBlobStorOptions(blobOpts...), + WithMetaBaseOptions( + meta.WithPath(filepath.Join(rootPath, "meta")), + meta.WithEpochState(epochState{}), + ), + WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))), + WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { + sh.HandleDeletedLocks(addresses) + }), + WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { + sh.HandleExpiredLocks(ctx, epoch, a) + }), + WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + return pool + }), + WithGCRemoverSleepInterval(1 * time.Second), + } + + sh = New(opts...) + + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init(context.Background())) + + t.Cleanup(func() { + require.NoError(t, sh.Close()) + }) + + cnr := cidtest.ID() + obj := testutil.GenerateObjectWithCID(cnr) + objID, _ := obj.ID() + var addr oid.Address + addr.SetContainer(cnr) + addr.SetObject(objID) + + var putPrm PutPrm + putPrm.SetObject(obj) + + _, err := sh.Put(putPrm) + require.NoError(t, err) + + var getPrm GetPrm + getPrm.SetAddress(objectCore.AddressOf(obj)) + _, err = sh.Get(context.Background(), getPrm) + require.NoError(t, err, "failed to get") + + //inhume + var inhumePrm InhumePrm + inhumePrm.MarkAsGarbage(addr) + _, err = sh.Inhume(context.Background(), inhumePrm) + require.NoError(t, err, "failed to inhume") + _, err = sh.Get(context.Background(), getPrm) + require.Error(t, err, "get returned error") + require.True(t, IsErrNotFound(err), "invalid error type") + + //storageID + var metaStIDPrm meta.StorageIDPrm + metaStIDPrm.SetAddress(addr) + storageID, err := sh.metaBase.StorageID(metaStIDPrm) + require.NoError(t, err, "failed to get storage ID") + + //check existance in blobstore + var bsExisted common.ExistsPrm + bsExisted.Address = addr + bsExisted.StorageID = storageID.StorageID() + exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) + require.NoError(t, err, "failed to check blobstore existance") + require.True(t, exRes.Exists, "invalid blobstore existance result") + + //drop from blobstor + var bsDeletePrm common.DeletePrm + bsDeletePrm.Address = addr + bsDeletePrm.StorageID = storageID.StorageID() + _, err = sh.blobStor.Delete(bsDeletePrm) + require.NoError(t, err, "failed to delete from blobstore") + + //check existance in blobstore + exRes, err = sh.blobStor.Exists(context.Background(), bsExisted) + require.NoError(t, err, "failed to check blobstore existance") + require.False(t, exRes.Exists, "invalid blobstore existance result") + + //get should return object not found + _, err = sh.Get(context.Background(), getPrm) + require.Error(t, err, "get returned no error") + require.True(t, IsErrNotFound(err), "invalid error type") +} diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 8012e60f8..23bd0cd19 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -2,77 +2,30 @@ package shard_test import ( "context" - "path/filepath" + "errors" "testing" "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) -func Test_GCDropsLockedExpiredObject(t *testing.T) { - var sh *shard.Shard +func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { + t.Parallel() epoch := &epochState{ Value: 100, } - rootPath := t.TempDir() - opts := []shard.Option{ - shard.WithID(shard.NewIDFromBytes([]byte{})), - shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}), - shard.WithBlobStorOptions( - blobstor.WithStorages([]blobstor.SubStorage{ - { - Storage: blobovniczatree.NewBlobovniczaTree( - blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")), - blobovniczatree.WithBlobovniczaShallowDepth(2), - blobovniczatree.WithBlobovniczaShallowWidth(2)), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return len(data) <= 1<<20 - }, - }, - { - Storage: fstree.New( - fstree.WithPath(filepath.Join(rootPath, "blob"))), - }, - }), - ), - shard.WithMetaBaseOptions( - meta.WithPath(filepath.Join(rootPath, "meta")), - meta.WithEpochState(epoch), - ), - shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { - sh.HandleDeletedLocks(addresses) - }), - shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { - sh.HandleExpiredLocks(ctx, epoch, a) - }), - shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - require.NoError(t, err) - - return pool - }), - } - - sh = shard.New(opts...) - require.NoError(t, sh.Open()) - require.NoError(t, sh.Init(context.Background())) + sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)}) t.Cleanup(func() { releaseShard(sh, t) @@ -120,3 +73,97 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) { return shard.IsErrNotFound(err) }, 3*time.Second, 1*time.Second, "expired object must be deleted") } + +func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { + t.Parallel() + + epoch := &epochState{ + Value: 100, + } + + cnr := cidtest.ID() + parentID := oidtest.ID() + splitID := objectSDK.NewSplitID() + + var objExpirationAttr objectSDK.Attribute + objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch) + objExpirationAttr.SetValue("101") + + var lockExpirationAttr objectSDK.Attribute + lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch) + lockExpirationAttr.SetValue("103") + + parent := testutil.GenerateObjectWithCID(cnr) + parent.SetID(parentID) + parent.SetPayload(nil) + parent.SetAttributes(objExpirationAttr) + + const childCount = 10 + children := make([]*objectSDK.Object, childCount) + childIDs := make([]oid.ID, childCount) + for i := range children { + children[i] = testutil.GenerateObjectWithCID(cnr) + if i != 0 { + children[i].SetPreviousID(childIDs[i-1]) + } + if i == len(children)-1 { + children[i].SetParent(parent) + } + children[i].SetSplitID(splitID) + children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)}) + childIDs[i], _ = children[i].ID() + } + + link := testutil.GenerateObjectWithCID(cnr) + link.SetParent(parent) + link.SetParentID(parentID) + link.SetSplitID(splitID) + link.SetChildren(childIDs...) + + linkID, _ := link.ID() + + sh := newCustomShard(t, t.TempDir(), false, nil, nil, []meta.Option{meta.WithEpochState(epoch)}) + + t.Cleanup(func() { + releaseShard(sh, t) + }) + + lock := testutil.GenerateObjectWithCID(cnr) + lock.SetType(objectSDK.TypeLock) + lock.SetAttributes(lockExpirationAttr) + lockID, _ := lock.ID() + + var putPrm shard.PutPrm + + for _, child := range children { + putPrm.SetObject(child) + _, err := sh.Put(putPrm) + require.NoError(t, err) + } + + putPrm.SetObject(link) + _, err := sh.Put(putPrm) + require.NoError(t, err) + + err = sh.Lock(cnr, lockID, append(childIDs, parentID, linkID)) + require.NoError(t, err) + + putPrm.SetObject(lock) + _, err = sh.Put(putPrm) + require.NoError(t, err) + + var getPrm shard.GetPrm + getPrm.SetAddress(objectCore.AddressOf(parent)) + + _, err = sh.Get(context.Background(), getPrm) + var splitInfoError *objectSDK.SplitInfoError + require.True(t, errors.As(err, &splitInfoError), "split info must be provided") + + epoch.Value = 105 + sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value) + + require.Eventually(t, func() bool { + _, err = sh.Get(context.Background(), getPrm) + return shard.IsErrNotFound(err) + }, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires") +} diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 3406b9338..cbe5fcafc 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -65,6 +65,7 @@ func (r GetRes) HasMeta() bool { // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. +// Returns the ErrShardDisabled if the shard is disabled. func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get", trace.WithAttributes( @@ -77,6 +78,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { s.m.RLock() defer s.m.RUnlock() + if s.info.Mode.Disabled() { + return GetRes{}, ErrShardDisabled + } + cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { var getPrm common.GetPrm getPrm.Address = prm.addr diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 18e97e259..d39a98224 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -168,7 +168,7 @@ func TestCounters(t *testing.T) { deletedNumber := int(phy / 4) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) - _, err := sh.Delete(prm) + _, err := sh.Delete(context.Background(), prm) require.NoError(t, err) require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical]) diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 17ed3f3c8..c648133f8 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -18,8 +18,8 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode") // Returns any error encountered that did not allow // setting shard mode. func (s *Shard) SetMode(m mode.Mode) error { - s.m.Lock() - defer s.m.Unlock() + unlock := s.lockExclusive() + defer unlock() return s.setMode(m) } @@ -55,9 +55,11 @@ func (s *Shard) setMode(m mode.Mode) error { } } - for i := range components { - if err := components[i].SetMode(m); err != nil { - return err + if !m.Disabled() { + for i := range components { + if err := components[i].SetMode(m); err != nil { + return err + } } } diff --git a/pkg/local_object_storage/shard/mode/mode.go b/pkg/local_object_storage/shard/mode/mode.go index 65b2b5c89..49c888d63 100644 --- a/pkg/local_object_storage/shard/mode/mode.go +++ b/pkg/local_object_storage/shard/mode/mode.go @@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool { func (m Mode) ReadOnly() bool { return m&ReadOnly != 0 } + +func (m Mode) Disabled() bool { + return m == Disabled +} diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index 4355c31a3..020ac1625 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -72,6 +72,7 @@ func (r RngRes) HasMeta() bool { // Returns an error of type apistatus.ObjectNotFound if the requested object is missing. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. +// Returns the ErrShardDisabled if the shard is disabled. func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange", trace.WithAttributes( @@ -86,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { s.m.RLock() defer s.m.RUnlock() + if s.info.Mode.Disabled() { + return RngRes{}, ErrShardDisabled + } + cb := func(stor *blobstor.BlobStor, id []byte) (*object.Object, error) { var getRngPrm common.GetRangePrm getRngPrm.Address = prm.addr diff --git a/pkg/local_object_storage/shard/range_test.go b/pkg/local_object_storage/shard/range_test.go index 164181214..39a04fcac 100644 --- a/pkg/local_object_storage/shard/range_test.go +++ b/pkg/local_object_storage/shard/range_test.go @@ -84,7 +84,8 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) { Storage: fstree.New( fstree.WithPath(filepath.Join(t.TempDir(), "blob"))), }, - })}) + })}, + nil) defer releaseShard(sh, t) for _, tc := range testCases { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 6d1fba141..b77df62d0 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -3,6 +3,7 @@ package shard import ( "context" "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -31,6 +32,9 @@ type Shard struct { metaBase *meta.DB tsSource TombstoneSource + + gcCancel atomic.Value + setModeRequested atomic.Bool } // Option represents Shard's constructor option. @@ -209,12 +213,12 @@ func WithWriteCache(use bool) Option { } // hasWriteCache returns bool if write cache exists on shards. -func (s Shard) hasWriteCache() bool { +func (s *Shard) hasWriteCache() bool { return s.cfg.useWriteCache } // needRefillMetabase returns true if metabase is needed to be refilled. -func (s Shard) needRefillMetabase() bool { +func (s *Shard) needRefillMetabase() bool { return s.cfg.refillMetabase } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index fea342766..7b2fdb5d1 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -4,6 +4,7 @@ import ( "context" "path/filepath" "testing" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" @@ -12,8 +13,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -29,11 +33,13 @@ func (s epochState) CurrentEpoch() uint64 { func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { return newCustomShard(t, t.TempDir(), enableWriteCache, + nil, nil, nil) } -func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard { +func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard { + var sh *shard.Shard if enableWriteCache { rootPath = filepath.Join(rootPath, "wc") } else { @@ -67,8 +73,9 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts shard.WithLogger(&logger.Logger{Logger: zap.L()}), shard.WithBlobStorOptions(bsOpts...), shard.WithMetaBaseOptions( - meta.WithPath(filepath.Join(rootPath, "meta")), - meta.WithEpochState(epochState{}), + append([]meta.Option{ + meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})}, + metaOptions...)..., ), shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))), shard.WithWriteCache(enableWriteCache), @@ -77,9 +84,21 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))}, wcOpts...)..., ), + shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { + sh.HandleDeletedLocks(addresses) + }), + shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { + sh.HandleExpiredLocks(ctx, epoch, a) + }), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + return pool + }), + shard.WithGCRemoverSleepInterval(1 * time.Millisecond), } - sh := shard.New(opts...) + sh = shard.New(opts...) require.NoError(t, sh.Open()) require.NoError(t, sh.Init(context.Background())) diff --git a/pkg/local_object_storage/shard/shutdown_test.go b/pkg/local_object_storage/shard/shutdown_test.go index 5fd13221a..b19c02113 100644 --- a/pkg/local_object_storage/shard/shutdown_test.go +++ b/pkg/local_object_storage/shard/shutdown_test.go @@ -37,7 +37,7 @@ func TestWriteCacheObjectLoss(t *testing.T) { writecache.WithSmallObjectSize(smallSize), writecache.WithMaxObjectSize(smallSize * 2)} - sh := newCustomShard(t, dir, true, wcOpts, nil) + sh := newCustomShard(t, dir, true, wcOpts, nil, nil) var putPrm shard.PutPrm @@ -48,7 +48,7 @@ func TestWriteCacheObjectLoss(t *testing.T) { } require.NoError(t, sh.Close()) - sh = newCustomShard(t, dir, true, wcOpts, nil) + sh = newCustomShard(t, dir, true, wcOpts, nil, nil) defer releaseShard(sh, t) var getPrm shard.GetPrm diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index db07c001e..bbe16ba64 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -155,6 +155,21 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) { return s.pilorama.TreeList(cid) } +func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) { + if s.pilorama == nil { + return 0, ErrPiloramaDisabled + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return 0, ErrDegradedMode + } + + return s.pilorama.TreeHeight(cid, treeID) +} + // TreeExists implements the pilorama.Forest interface. func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { if s.pilorama == nil { diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 0437367e7..589edf9df 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -32,11 +32,11 @@ const ( func (c *cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.flushWorker(i) + go c.workerFlushSmall() } c.wg.Add(1) - go c.flushBigObjects() + go c.workerFlushBig() c.wg.Add(1) go func() { @@ -48,7 +48,7 @@ func (c *cache) runFlushLoop() { for { select { case <-tt.C: - c.flushDB() + c.flushSmallObjects() tt.Reset(defaultFlushInterval) case <-c.closeCh: return @@ -57,7 +57,7 @@ func (c *cache) runFlushLoop() { }() } -func (c *cache) flushDB() { +func (c *cache) flushSmallObjects() { var lastKey []byte var m []objectInfo for { @@ -70,7 +70,7 @@ func (c *cache) flushDB() { m = m[:0] c.modeMtx.RLock() - if c.readOnly() || !c.initialized.Load() { + if c.readOnly() { c.modeMtx.RUnlock() time.Sleep(time.Second) continue @@ -109,10 +109,6 @@ func (c *cache) flushDB() { var count int for i := range m { - if c.flushed.Contains(m[i].addr) { - continue - } - obj := object.New() if err := obj.Unmarshal(m[i].data); err != nil { continue @@ -140,7 +136,7 @@ func (c *cache) flushDB() { } } -func (c *cache) flushBigObjects() { +func (c *cache) workerFlushBig() { defer c.wg.Done() tick := time.NewTicker(defaultFlushInterval * 10) @@ -151,9 +147,6 @@ func (c *cache) flushBigObjects() { if c.readOnly() { c.modeMtx.RUnlock() break - } else if !c.initialized.Load() { - c.modeMtx.RUnlock() - continue } _ = c.flushFSTree(true) @@ -181,10 +174,6 @@ func (c *cache) flushFSTree(ignoreErrors bool) error { prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { sAddr := addr.EncodeToString() - if _, ok := c.store.flushed.Peek(sAddr); ok { - return nil - } - data, err := f() if err != nil { c.reportFlushError("can't read a file", sAddr, err) @@ -212,9 +201,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error { return err } - // mark object as flushed - c.flushed.Add(sAddr, false) - + c.deleteFromDisk([]string{sAddr}) return nil } @@ -222,8 +209,8 @@ func (c *cache) flushFSTree(ignoreErrors bool) error { return err } -// flushWorker writes objects to the main storage. -func (c *cache) flushWorker(_ int) { +// workerFlushSmall writes small objects to the main storage. +func (c *cache) workerFlushSmall() { defer c.wg.Done() var obj *object.Object @@ -236,9 +223,12 @@ func (c *cache) flushWorker(_ int) { } err := c.flushObject(obj, nil) - if err == nil { - c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) + if err != nil { + // Error is handled in flushObject. + continue } + + c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()}) } } @@ -294,10 +284,6 @@ func (c *cache) flush(ignoreErrors bool) error { cs := b.Cursor() for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { sa := string(k) - if _, ok := c.flushed.Peek(sa); ok { - continue - } - if err := addr.DecodeString(sa); err != nil { c.reportFlushError("can't decode object address from the DB", sa, err) if ignoreErrors { diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 9dc216fb3..530a51d9e 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -5,7 +5,6 @@ import ( "os" "path/filepath" "testing" - "time" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -15,7 +14,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -109,22 +107,9 @@ func TestFlush(t *testing.T) { require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) - require.NoError(t, wc.Flush(false)) - for i := 0; i < 2; i++ { - var mPrm meta.GetPrm - mPrm.SetAddress(objects[i].addr) - _, err := mb.Get(mPrm) - require.Error(t, err) - - _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) - require.Error(t, err) - } - - check(t, mb, bs, objects[2:]) + check(t, mb, bs, objects) }) t.Run("flush on moving to degraded mode", func(t *testing.T) { @@ -138,23 +123,9 @@ func TestFlush(t *testing.T) { require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) - require.NoError(t, wc.SetMode(mode.Degraded)) - for i := 0; i < 2; i++ { - var mPrm meta.GetPrm - mPrm.SetAddress(objects[i].addr) - _, err := mb.Get(mPrm) - require.Error(t, err) - - _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) - require.Error(t, err) - } - - check(t, mb, bs, objects[2:]) + check(t, mb, bs, objects) }) t.Run("ignore errors", func(t *testing.T) { @@ -223,67 +194,6 @@ func TestFlush(t *testing.T) { }) }) }) - - t.Run("on init", func(t *testing.T) { - wc, bs, mb := newCache(t) - objects := []objectPair{ - // removed - putObject(t, wc, 1), - putObject(t, wc, smallSize+1), - // not found - putObject(t, wc, 1), - putObject(t, wc, smallSize+1), - // ok - putObject(t, wc, 1), - putObject(t, wc, smallSize+1), - } - - require.NoError(t, wc.Close()) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) - - for i := range objects { - var prm meta.PutPrm - prm.SetObject(objects[i].obj) - _, err := mb.Put(prm) - require.NoError(t, err) - } - - var inhumePrm meta.InhumePrm - inhumePrm.SetAddresses(objects[0].addr, objects[1].addr) - inhumePrm.SetTombstoneAddress(oidtest.Address()) - _, err := mb.Inhume(inhumePrm) - require.NoError(t, err) - - var deletePrm meta.DeletePrm - deletePrm.SetAddresses(objects[2].addr, objects[3].addr) - _, err = mb.Delete(deletePrm) - require.NoError(t, err) - - require.NoError(t, bs.SetMode(mode.ReadOnly)) - require.NoError(t, mb.SetMode(mode.ReadOnly)) - - // Open in read-only: no error, nothing is removed. - require.NoError(t, wc.Open(true)) - initWC(t, wc) - for i := range objects { - _, err := wc.Get(context.Background(), objects[i].addr) - require.NoError(t, err, i) - } - require.NoError(t, wc.Close()) - - // Open in read-write: no error, something is removed. - require.NoError(t, wc.Open(false)) - initWC(t, wc) - for i := range objects { - _, err := wc.Get(context.Background(), objects[i].addr) - if i < 2 { - require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i) - } else { - require.NoError(t, err, i) - } - } - }) } func putObject(t *testing.T, c Cache, size int) objectPair { @@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) { func initWC(t *testing.T, wc Cache) { require.NoError(t, wc.Init()) - - require.Eventually(t, func() bool { - rawWc := wc.(*cache) - return rawWc.initialized.Load() - }, 100*time.Second, 1*time.Millisecond) } type dummyEpoch struct{} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 6af1bd181..030f9b413 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e value, err := Get(c.db, []byte(saddr)) if err == nil { obj := objectSDK.New() - c.flushed.Get(saddr) return obj, obj.Unmarshal(value) } @@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } - c.flushed.Get(saddr) return res.Object, nil } diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go deleted file mode 100644 index ffe7a0129..000000000 --- a/pkg/local_object_storage/writecache/init.go +++ /dev/null @@ -1,183 +0,0 @@ -package writecache - -import ( - "context" - "errors" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" - "go.uber.org/zap" -) - -func (c *cache) initFlushMarks() { - var localWG sync.WaitGroup - - localWG.Add(1) - go func() { - defer localWG.Done() - - c.fsTreeFlushMarkUpdate() - }() - - localWG.Add(1) - go func() { - defer localWG.Done() - - c.dbFlushMarkUpdate() - }() - - c.initWG.Add(1) - c.wg.Add(1) - go func() { - defer c.wg.Done() - defer c.initWG.Done() - - localWG.Wait() - - select { - case <-c.stopInitCh: - return - case <-c.closeCh: - return - default: - } - - c.initialized.Store(true) - }() -} - -var errStopIter = errors.New("stop iteration") - -func (c *cache) fsTreeFlushMarkUpdate() { - c.log.Info("filling flush marks for objects in FSTree") - - var prm common.IteratePrm - prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { - select { - case <-c.closeCh: - return errStopIter - case <-c.stopInitCh: - return errStopIter - default: - } - - flushed, needRemove := c.flushStatus(addr) - if flushed { - c.store.flushed.Add(addr.EncodeToString(), true) - if needRemove { - var prm common.DeletePrm - prm.Address = addr - - _, err := c.fsTree.Delete(prm) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("fstree DELETE"), - ) - } - } - } - return nil - } - _, _ = c.fsTree.Iterate(prm) - c.log.Info("finished updating FSTree flush marks") -} - -func (c *cache) dbFlushMarkUpdate() { - c.log.Info("filling flush marks for objects in database") - - var m []string - var indices []int - var lastKey []byte - var batchSize = flushBatchSize - for { - select { - case <-c.closeCh: - return - case <-c.stopInitCh: - return - default: - } - - m = m[:0] - indices = indices[:0] - - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() { - m = append(m, string(k)) - } - return nil - }) - - var addr oid.Address - for i := range m { - if err := addr.DecodeString(m[i]); err != nil { - continue - } - - flushed, needRemove := c.flushStatus(addr) - if flushed { - c.store.flushed.Add(addr.EncodeToString(), true) - if needRemove { - indices = append(indices, i) - } - } - } - - if len(m) == 0 { - break - } - - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - for _, j := range indices { - if err := b.Delete([]byte(m[j])); err != nil { - return err - } - } - return nil - }) - if err == nil { - for _, j := range indices { - storagelog.Write(c.log, - zap.String("address", m[j]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - } - } - lastKey = append([]byte(m[len(m)-1]), 0) - } - - c.log.Info("finished updating flush marks") -} - -// flushStatus returns info about the object state in the main storage. -// First return value is true iff object exists. -// Second return value is true iff object can be safely removed. -func (c *cache) flushStatus(addr oid.Address) (bool, bool) { - var existsPrm meta.ExistsPrm - existsPrm.SetAddress(addr) - - _, err := c.metabase.Exists(existsPrm) - if err != nil { - needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) - return needRemove, needRemove - } - - var prm meta.StorageIDPrm - prm.SetAddress(addr) - - mRes, _ := c.metabase.StorageID(prm) - res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()}) - return err == nil && res.Exists, false -} diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 228dd2597..ebe979520 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error { err := c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) return b.ForEach(func(k, data []byte) error { - if _, ok := c.flushed.Peek(string(k)); ok { - return nil - } return prm.handler(data) }) }) @@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error { var fsPrm common.IteratePrm fsPrm.IgnoreErrors = prm.ignoreErrors fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - if _, ok := c.flushed.Peek(addr.EncodeToString()); ok { - return nil - } data, err := f() if err != nil { if prm.ignoreErrors { diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 997310d9e..4b4d121cd 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -36,19 +36,6 @@ func (c *cache) setMode(m mode.Mode) error { } } - if !c.initialized.Load() { - close(c.stopInitCh) - - c.initWG.Wait() - c.stopInitCh = make(chan struct{}) - - defer func() { - if err == nil && !turnOffMeta { - c.initFlushMarks() - } - }() - } - if c.db != nil { if err = c.db.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 7791e93dc..64e1d1226 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -26,8 +26,6 @@ func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) { defer c.modeMtx.RUnlock() if c.readOnly() { return common.PutRes{}, ErrReadOnly - } else if !c.initialized.Load() { - return common.PutRes{}, ErrNotInitialized } sz := uint64(len(prm.RawData)) @@ -67,7 +65,7 @@ func (c *cache) putSmall(obj objectInfo) error { ) c.objCounters.IncDB() } - return nil + return err } // putBig writes object to FSTree and pushes it to the flush workers queue. diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 667d34cb9..a445a9671 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -11,8 +11,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - lru "github.com/hashicorp/golang-lru/v2" - "github.com/hashicorp/golang-lru/v2/simplelru" "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -20,19 +18,7 @@ import ( // store represents persistent storage with in-memory LRU cache // for flushed items on top of it. type store struct { - maxFlushedMarksCount int - maxRemoveBatchSize int - - // flushed contains addresses of objects that were already flushed to the main storage. - // We use LRU cache instead of map here to facilitate removing of unused object in favour of - // frequently read ones. - // MUST NOT be used inside bolt db transaction because it's eviction handler - // removes untracked items from the database. - flushed simplelru.LRUCache[string, bool] - db *bbolt.DB - - dbKeysToRemove []string - fsKeysToRemove []string + db *bbolt.DB } const dbName = "small.bolt" @@ -71,35 +57,9 @@ func (c *cache) openStore(readOnly bool) error { return fmt.Errorf("could not open FSTree: %w", err) } - // Write-cache can be opened multiple times during `SetMode`. - // flushed map must not be re-created in this case. - if c.flushed == nil { - c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed) - } - - c.initialized.Store(false) - return nil } -// removeFlushed removes an object from the writecache. -// To minimize interference with the client operations, the actual removal -// is done in batches. -// It is not thread-safe and is used only as an evict callback to LRU cache. -func (c *cache) removeFlushed(key string, value bool) { - fromDatabase := value - if fromDatabase { - c.dbKeysToRemove = append(c.dbKeysToRemove, key) - } else { - c.fsKeysToRemove = append(c.fsKeysToRemove, key) - } - - if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize { - c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove) - c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove) - } -} - func (c *cache) deleteFromDB(keys []string) []string { if len(keys) == 0 { return keys diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 24070dbda..988229bfa 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -12,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -51,11 +50,8 @@ type cache struct { // mtx protects statistics, counters and compressFlags. mtx sync.RWMutex - mode mode.Mode - initialized atomic.Bool - stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them - initWG sync.WaitGroup // for initialisation routines only - modeMtx sync.RWMutex + mode mode.Mode + modeMtx sync.RWMutex // compressFlags maps address of a big object to boolean value indicating // whether object should be compressed. @@ -95,9 +91,8 @@ var ( // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - flushCh: make(chan *object.Object), - mode: mode.ReadWrite, - stopInitCh: make(chan struct{}), + flushCh: make(chan *object.Object), + mode: mode.ReadWrite, compressFlags: make(map[string]struct{}), options: options{ @@ -116,12 +111,6 @@ func New(opts ...Option) Cache { opts[i](&c.options) } - // Make the LRU cache contain which take approximately 3/4 of the maximum space. - // Assume small and big objects are stored in 50-50 proportion. - c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4 - // Trigger the removal when the cache is 7/8 full, so that new items can still arrive. - c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8 - return c } @@ -152,31 +141,27 @@ func (c *cache) Open(readOnly bool) error { // Init runs necessary services. func (c *cache) Init() error { - c.initFlushMarks() c.runFlushLoop() return nil } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *cache) Close() error { + // We cannot lock mutex for the whole operation duration + // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() - defer c.modeMtx.Unlock() - - // Finish all in-progress operations. - if err := c.setMode(mode.ReadOnly); err != nil { - return err - } - if c.closeCh != nil { close(c.closeCh) } + c.mode = mode.DegradedReadOnly // prevent new operations from being processed + c.modeMtx.Unlock() + c.wg.Wait() - if c.closeCh != nil { - c.closeCh = nil - } - c.initialized.Store(false) + c.modeMtx.Lock() + defer c.modeMtx.Unlock() + c.closeCh = nil var err error if c.db != nil { err = c.db.Close() diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index b93c5f75f..e560d2a61 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -57,8 +57,6 @@ type Client struct { acc *wallet.Account // neo account accAddr util.Uint160 // account's address - signer *transaction.Signer - notary *notaryInfo cfg cfg @@ -70,9 +68,6 @@ type Client struct { // on every normal call. switchLock *sync.RWMutex - notifications chan rpcclient.Notification - subsInfo // protected with switchLock - // channel for internal stop closeChan chan struct{} @@ -567,26 +562,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val // NotificationChannel returns channel than receives subscribed // notification from the connected RPC node. -// Channel is closed when connection to the RPC node has been -// lost without the possibility of recovery. +// Channel is closed when connection to the RPC node is lost. func (c *Client) NotificationChannel() <-chan rpcclient.Notification { - return c.notifications -} - -// inactiveMode switches Client to an inactive mode: -// - notification channel is closed; -// - all the new RPC request would return ErrConnectionLost; -// - inactiveModeCb is called if not nil. -func (c *Client) inactiveMode() { - c.switchLock.Lock() - defer c.switchLock.Unlock() - - close(c.notifications) - c.inactive = true - - if c.cfg.inactiveModeCb != nil { - c.cfg.inactiveModeCb() - } + c.switchLock.RLock() + defer c.switchLock.RUnlock() + return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980 } func (c *Client) setActor(act *actor.Actor) { diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 9ed275029..9bd5b910b 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -9,11 +9,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" lru "github.com/hashicorp/golang-lru/v2" - "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/util" @@ -101,22 +98,13 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er } cli := &Client{ - cache: newClientCache(), - logger: cfg.logger, - acc: acc, - accAddr: accAddr, - signer: cfg.signer, - cfg: *cfg, - switchLock: &sync.RWMutex{}, - notifications: make(chan rpcclient.Notification), - subsInfo: subsInfo{ - blockRcv: make(chan *block.Block), - notificationRcv: make(chan *state.ContainedNotificationEvent), - notaryReqRcv: make(chan *result.NotaryRequestEvent), - subscribedEvents: make(map[util.Uint160]string), - subscribedNotaryEvents: make(map[util.Uint160]string), - }, - closeChan: make(chan struct{}), + cache: newClientCache(), + logger: cfg.logger, + acc: acc, + accAddr: accAddr, + cfg: *cfg, + switchLock: &sync.RWMutex{}, + closeChan: make(chan struct{}), } cli.endpoints.init(cfg.endpoints) @@ -145,7 +133,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er } cli.setActor(act) - go cli.notificationLoop(ctx) + go cli.closeWaiter(ctx) return cli, nil } diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 5d736839a..c9e37e8e8 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -5,11 +5,6 @@ import ( "sort" "time" - "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/neorpc" - "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "go.uber.org/zap" ) @@ -33,7 +28,8 @@ func (e *endpoints) init(ee []Endpoint) { e.list = ee } -func (c *Client) switchRPC(ctx context.Context) bool { +// SwitchRPC performs reconnection and returns true if it was successful. +func (c *Client) SwitchRPC(ctx context.Context) bool { c.switchLock.Lock() defer c.switchLock.Unlock() @@ -57,20 +53,8 @@ func (c *Client) switchRPC(ctx context.Context) bool { c.logger.Info("connection to the new RPC node has been established", zap.String("endpoint", newEndpoint)) - subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false) - if !ok { - // new WS client does not allow - // restoring subscription, client - // could not work correctly => - // closing connection to RPC node - // to switch to another one - cli.Close() - continue - } - c.client = cli c.setActor(act) - c.subsInfo = subs if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { @@ -81,97 +65,21 @@ func (c *Client) switchRPC(ctx context.Context) bool { return true } + c.inactive = true + + if c.cfg.inactiveModeCb != nil { + c.cfg.inactiveModeCb() + } return false } -func (c *Client) notificationLoop(ctx context.Context) { - var e any - var ok bool - - for { - c.switchLock.RLock() - bChan := c.blockRcv - nChan := c.notificationRcv - nrChan := c.notaryReqRcv - c.switchLock.RUnlock() - - select { - case <-ctx.Done(): - _ = c.UnsubscribeAll() - c.close() - - return - case <-c.closeChan: - _ = c.UnsubscribeAll() - c.close() - - return - case e, ok = <-bChan: - case e, ok = <-nChan: - case e, ok = <-nrChan: - } - - if ok { - c.routeEvent(ctx, e) - continue - } - - if !c.reconnect(ctx) { - return - } - } -} - -func (c *Client) routeEvent(ctx context.Context, e any) { - typedNotification := rpcclient.Notification{Value: e} - - switch e.(type) { - case *block.Block: - typedNotification.Type = neorpc.BlockEventID - case *state.ContainedNotificationEvent: - typedNotification.Type = neorpc.NotificationEventID - case *result.NotaryRequestEvent: - typedNotification.Type = neorpc.NotaryRequestEventID - } - +func (c *Client) closeWaiter(ctx context.Context) { select { - case c.notifications <- typedNotification: case <-ctx.Done(): - _ = c.UnsubscribeAll() - c.close() case <-c.closeChan: - _ = c.UnsubscribeAll() - c.close() } -} - -func (c *Client) reconnect(ctx context.Context) bool { - if closeErr := c.client.GetError(); closeErr != nil { - c.logger.Warn("switching to the next RPC node", - zap.String("reason", closeErr.Error()), - ) - } else { - // neo-go client was closed by calling `Close` - // method, that happens only when a client has - // switched to the more prioritized RPC - return true - } - - if !c.switchRPC(ctx) { - c.logger.Error("could not establish connection to any RPC node") - - // could not connect to all endpoints => - // switch client to inactive mode - c.inactiveMode() - - return false - } - - // TODO(@carpawell): call here some callback retrieved in constructor - // of the client to allow checking chain state since during switch - // process some notification could be lost - - return true + _ = c.UnsubscribeAll() + c.close() } func (c *Client) switchToMostPrioritized(ctx context.Context) { @@ -217,36 +125,28 @@ mainLoop: continue } - if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok { - c.switchLock.Lock() - - // higher priority node could have been - // connected in the other goroutine - if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority { - cli.Close() - c.switchLock.Unlock() - return - } - - c.client.Close() - c.cache.invalidate() - c.client = cli - c.setActor(act) - c.subsInfo = subs - c.endpoints.curr = i + c.switchLock.Lock() + // higher priority node could have been + // connected in the other goroutine + if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority { + cli.Close() c.switchLock.Unlock() - - c.logger.Info("switched to the higher priority RPC", - zap.String("endpoint", tryE)) - return } - c.logger.Warn("could not restore side chain subscriptions using node", - zap.String("endpoint", tryE), - zap.Error(err), - ) + c.client.Close() + c.cache.invalidate() + c.client = cli + c.setActor(act) + c.endpoints.curr = i + + c.switchLock.Unlock() + + c.logger.Info("switched to the higher priority RPC", + zap.String("endpoint", tryE)) + + return } } } @@ -254,6 +154,7 @@ mainLoop: // close closes notification channel and wrapped WS client. func (c *Client) close() { - close(c.notifications) + c.switchLock.RLock() + defer c.switchLock.RUnlock() c.client.Close() } diff --git a/pkg/morph/client/nns.go b/pkg/morph/client/nns.go index 0a23aa47a..de708575d 100644 --- a/pkg/morph/client/nns.go +++ b/pkg/morph/client/nns.go @@ -208,8 +208,8 @@ func (c *Client) SetGroupSignerScope() error { return err } - c.signer.Scopes = transaction.CustomGroups - c.signer.AllowedGroups = []*keys.PublicKey{pub} + c.cfg.signer.Scopes = transaction.CustomGroups + c.cfg.signer.AllowedGroups = []*keys.PublicKey{pub} return nil } diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index 96dca0319..069c35782 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -596,18 +596,18 @@ func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, comm s = append(s, transaction.Signer{ Account: hash.Hash160(multisigScript), - Scopes: c.signer.Scopes, - AllowedContracts: c.signer.AllowedContracts, - AllowedGroups: c.signer.AllowedGroups, + Scopes: c.cfg.signer.Scopes, + AllowedContracts: c.cfg.signer.AllowedContracts, + AllowedGroups: c.cfg.signer.AllowedGroups, }) if !invokedByAlpha { // then we have invoker signature s = append(s, transaction.Signer{ Account: hash.Hash160(c.acc.GetVerificationScript()), - Scopes: c.signer.Scopes, - AllowedContracts: c.signer.AllowedContracts, - AllowedGroups: c.signer.AllowedGroups, + Scopes: c.cfg.signer.Scopes, + AllowedContracts: c.cfg.signer.AllowedContracts, + AllowedGroups: c.cfg.signer.AllowedGroups, }) } diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 300bab825..dbca00d7c 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,15 +1,11 @@ package client import ( - "context" - "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" - "go.uber.org/zap" ) // Close closes connection to the remote side making @@ -23,71 +19,46 @@ func (c *Client) Close() { close(c.closeChan) } -// SubscribeForExecutionNotifications adds subscription for notifications -// generated during contract transaction execution to this instance of client. +// ReceiveExecutionNotifications performs subscription for notifications +// generated during contract execution. Events are sent to the specified channel. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error { +func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { - return ErrConnectionLost + return "", ErrConnectionLost } - _, subscribed := c.subscribedEvents[contract] - if subscribed { - // no need to subscribe one more time - return nil - } - - id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv) - if err != nil { - return err - } - - c.subscribedEvents[contract] = id - - return nil + return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch) } -// SubscribeForNewBlocks adds subscription for new block events to this -// instance of client. +// ReceiveBlocks performs subscription for new block events. Events are sent +// to the specified channel. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) SubscribeForNewBlocks() error { +func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { - return ErrConnectionLost + return "", ErrConnectionLost } - if c.subscribedToBlocks { - // no need to subscribe one more time - return nil - } - - _, err := c.client.ReceiveBlocks(nil, c.blockRcv) - if err != nil { - return err - } - - c.subscribedToBlocks = true - - return nil + return c.client.ReceiveBlocks(nil, ch) } -// SubscribeForNotaryRequests adds subscription for notary request payloads +// ReceiveNotaryRequests performsn subscription for notary request payloads // addition or removal events to this instance of client. Passed txSigner is // used as filter: subscription is only for the notary requests that must be -// signed by txSigner. +// signed by txSigner. Events are sent to the specified channel. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { +func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) { if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -96,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { defer c.switchLock.Unlock() if c.inactive { - return ErrConnectionLost + return "", ErrConnectionLost } - _, subscribed := c.subscribedNotaryEvents[txSigner] - if subscribed { - // no need to subscribe one more time - return nil - } - - id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv) - if err != nil { - return err - } - - c.subscribedNotaryEvents[txSigner] = id - - return nil + return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch) } -// UnsubscribeContract removes subscription for given contract event stream. +// Unsubscribe performs unsubscription for the given subscription ID. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) UnsubscribeContract(contract util.Uint160) error { +func (c *Client) Unsubscribe(subID string) error { c.switchLock.Lock() defer c.switchLock.Unlock() @@ -127,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error { return ErrConnectionLost } - _, subscribed := c.subscribedEvents[contract] - if !subscribed { - // no need to unsubscribe contract - // without subscription - return nil - } - - err := c.client.Unsubscribe(c.subscribedEvents[contract]) - if err != nil { - return err - } - - delete(c.subscribedEvents, contract) - - return nil -} - -// UnsubscribeNotaryRequest removes subscription for given notary requests -// signer. -// -// Returns ErrConnectionLost if client has not been able to establish -// connection to any of passed RPC endpoints. -func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error { - if c.notary == nil { - panic(notaryNotEnabledPanicMsg) - } - - c.switchLock.Lock() - defer c.switchLock.Unlock() - - if c.inactive { - return ErrConnectionLost - } - - _, subscribed := c.subscribedNotaryEvents[signer] - if !subscribed { - // no need to unsubscribe signer's - // requests without subscription - return nil - } - - err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer]) - if err != nil { - return err - } - - delete(c.subscribedNotaryEvents, signer) - - return nil + return c.client.Unsubscribe(subID) } // UnsubscribeAll removes all active subscriptions of current client. @@ -190,163 +100,10 @@ func (c *Client) UnsubscribeAll() error { return ErrConnectionLost } - // no need to unsubscribe if there are - // no active subscriptions - if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 && - !c.subscribedToBlocks { - return nil - } - err := c.client.UnsubscribeAll() if err != nil { return err } - c.subscribedEvents = make(map[util.Uint160]string) - c.subscribedNotaryEvents = make(map[util.Uint160]string) - c.subscribedToBlocks = false - return nil } - -// subsInfo includes channels for ws notifications; -// cached subscription information. -type subsInfo struct { - blockRcv chan *block.Block - notificationRcv chan *state.ContainedNotificationEvent - notaryReqRcv chan *result.NotaryRequestEvent - - subscribedToBlocks bool - subscribedEvents map[util.Uint160]string - subscribedNotaryEvents map[util.Uint160]string -} - -// restoreSubscriptions restores subscriptions according to cached -// information about them. -// -// If it is NOT a background operation switchLock MUST be held. -// Returns a pair: the second is a restoration status and the first -// one contains subscription information applied to the passed cli -// and receivers for the updated subscriptions. -// Does not change Client instance. -func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) { - var ( - err error - id string - ) - - stopCh := make(chan struct{}) - defer close(stopCh) - - blockRcv := make(chan *block.Block) - notificationRcv := make(chan *state.ContainedNotificationEvent) - notaryReqRcv := make(chan *result.NotaryRequestEvent) - - c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background) - - if background { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - } - - si.subscribedToBlocks = c.subscribedToBlocks - si.subscribedEvents = copySubsMap(c.subscribedEvents) - si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents) - si.blockRcv = blockRcv - si.notificationRcv = notificationRcv - si.notaryReqRcv = notaryReqRcv - - // new block events restoration - if si.subscribedToBlocks { - _, err = cli.ReceiveBlocks(nil, blockRcv) - if err != nil { - c.logger.Error("could not restore block subscription after RPC switch", - zap.String("endpoint", endpoint), - zap.Error(err), - ) - - return - } - } - - // notification events restoration - for contract := range si.subscribedEvents { - contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 - id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv) - if err != nil { - c.logger.Error("could not restore notification subscription after RPC switch", - zap.String("endpoint", endpoint), - zap.Error(err), - ) - - return - } - - si.subscribedEvents[contract] = id - } - - // notary notification events restoration - if c.notary != nil { - for signer := range si.subscribedNotaryEvents { - signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 - id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv) - if err != nil { - c.logger.Error("could not restore notary notification subscription after RPC switch", - zap.String("endpoint", endpoint), - zap.Error(err), - ) - - return - } - - si.subscribedNotaryEvents[signer] = id - } - } - - return si, true -} - -func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block, - notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) { - // neo-go WS client says to _always_ read notifications - // from its channel. Subscribing to any notification - // while not reading them in another goroutine may - // lead to a dead-lock, thus that async side notification - // listening while restoring subscriptions - - go func() { - var e any - var ok bool - - for { - select { - case <-stopCh: - return - case e, ok = <-blockRcv: - case e, ok = <-notificationRcv: - case e, ok = <-notaryReqRcv: - } - - if !ok { - return - } - - if background { - // background client (test) switch, no need to send - // any notification, just preventing dead-lock - continue - } - - c.routeEvent(ctx, e) - } - }() -} - -func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string { - newM := make(map[util.Uint160]string, len(m)) - for k, v := range m { - newM[k] = v - } - - return newM -} diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index 17bed5b2d..273114c50 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -10,8 +10,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -35,16 +35,27 @@ type ( Close() } + subChannels struct { + NotifyChan chan *state.ContainedNotificationEvent + BlockChan chan *block.Block + NotaryChan chan *result.NotaryRequestEvent + } + subscriber struct { *sync.RWMutex log *logger.Logger client *client.Client notifyChan chan *state.ContainedNotificationEvent - - blockChan chan *block.Block - + blockChan chan *block.Block notaryChan chan *result.NotaryRequestEvent + + current subChannels + + // cached subscription information + subscribedEvents map[util.Uint160]bool + subscribedNotaryEvents map[util.Uint160]bool + subscribedToNewBlocks bool } // Params is a group of Subscriber constructor parameters. @@ -75,22 +86,28 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error { s.Lock() defer s.Unlock() - notifyIDs := make(map[util.Uint160]struct{}, len(contracts)) + notifyIDs := make([]string, 0, len(contracts)) for i := range contracts { + if s.subscribedEvents[contracts[i]] { + continue + } // subscribe to contract notifications - err := s.client.SubscribeForExecutionNotifications(contracts[i]) + id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan) if err != nil { // if there is some error, undo all subscriptions and return error - for hash := range notifyIDs { - _ = s.client.UnsubscribeContract(hash) + for _, id := range notifyIDs { + _ = s.client.Unsubscribe(id) } return err } // save notification id - notifyIDs[contracts[i]] = struct{}{} + notifyIDs = append(notifyIDs, id) + } + for i := range contracts { + s.subscribedEvents[contracts[i]] = true } return nil @@ -109,82 +126,34 @@ func (s *subscriber) Close() { } func (s *subscriber) BlockNotifications() error { - if err := s.client.SubscribeForNewBlocks(); err != nil { + s.Lock() + defer s.Unlock() + if s.subscribedToNewBlocks { + return nil + } + if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil { return fmt.Errorf("could not subscribe for new block events: %w", err) } + s.subscribedToNewBlocks = true + return nil } func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error { - if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil { + s.Lock() + defer s.Unlock() + if s.subscribedNotaryEvents[mainTXSigner] { + return nil + } + if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil { return fmt.Errorf("could not subscribe for notary request events: %w", err) } + s.subscribedNotaryEvents[mainTXSigner] = true return nil } -func (s *subscriber) routeNotifications(ctx context.Context) { - notificationChan := s.client.NotificationChannel() - - for { - select { - case <-ctx.Done(): - return - case notification, ok := <-notificationChan: - if !ok { - s.log.Warn("remote notification channel has been closed") - close(s.notifyChan) - close(s.blockChan) - close(s.notaryChan) - - return - } - - switch notification.Type { - case neorpc.NotificationEventID: - notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent) - if !ok { - s.log.Error("can't cast notify event value to the notify struct", - zap.String("received type", fmt.Sprintf("%T", notification.Value)), - ) - continue - } - - s.log.Debug("new notification event from sidechain", - zap.String("name", notifyEvent.Name), - ) - - s.notifyChan <- notifyEvent - case neorpc.BlockEventID: - b, ok := notification.Value.(*block.Block) - if !ok { - s.log.Error("can't cast block event value to block", - zap.String("received type", fmt.Sprintf("%T", notification.Value)), - ) - continue - } - - s.blockChan <- b - case neorpc.NotaryRequestEventID: - notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent) - if !ok { - s.log.Error("can't cast notify event value to the notary request struct", - zap.String("received type", fmt.Sprintf("%T", notification.Value)), - ) - continue - } - - s.notaryChan <- notaryRequest - default: - s.log.Debug("unsupported notification from the chain", - zap.Uint8("type", uint8(notification.Type)), - ) - } - } - } -} - // New is a constructs Neo:Morph event listener and returns Subscriber interface. func New(ctx context.Context, p *Params) (Subscriber, error) { switch { @@ -208,16 +177,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { notifyChan: make(chan *state.ContainedNotificationEvent), blockChan: make(chan *block.Block), notaryChan: make(chan *result.NotaryRequestEvent), - } - // Worker listens all events from neo-go websocket and puts them - // into corresponding channel. It may be notifications, transactions, - // new blocks. For now only notifications. + current: newSubChannels(), + + subscribedEvents: make(map[util.Uint160]bool), + subscribedNotaryEvents: make(map[util.Uint160]bool), + } + // Worker listens all events from temporary NeoGo channel and puts them + // into corresponding permanent channels. go sub.routeNotifications(ctx) return sub, nil } +func (s *subscriber) routeNotifications(ctx context.Context) { + var ( + // TODO: not needed after nspcc-dev/neo-go#2980. + cliCh = s.client.NotificationChannel() + restoreCh = make(chan bool) + restoreInProgress bool + ) + +routeloop: + for { + var connLost bool + s.RLock() + curr := s.current + s.RUnlock() + select { + case <-ctx.Done(): + break routeloop + case ev, ok := <-curr.NotifyChan: + if ok { + s.notifyChan <- ev + } else { + connLost = true + } + case ev, ok := <-curr.BlockChan: + if ok { + s.blockChan <- ev + } else { + connLost = true + } + case ev, ok := <-curr.NotaryChan: + if ok { + s.notaryChan <- ev + } else { + connLost = true + } + case _, ok := <-cliCh: + connLost = !ok + case ok := <-restoreCh: + restoreInProgress = false + if !ok { + connLost = true + } + } + if connLost { + if !restoreInProgress { + restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh) + if !restoreInProgress { + break routeloop + } + curr.drain() + } else { // Avoid getting additional !ok events. + s.Lock() + s.current.NotifyChan = nil + s.current.BlockChan = nil + s.current.NotaryChan = nil + s.Unlock() + } + } + } + close(s.notifyChan) + close(s.blockChan) + close(s.notaryChan) +} + +func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) { + s.log.Info("RPC connection lost, attempting reconnect") + if !s.client.SwitchRPC(ctx) { + s.log.Error("can't switch RPC node") + return false, nil + } + + cliCh := s.client.NotificationChannel() + + s.Lock() + chs := newSubChannels() + go func() { + finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan) + }() + s.current = chs + s.Unlock() + + return true, cliCh +} + +func newSubChannels() subChannels { + return subChannels{ + NotifyChan: make(chan *state.ContainedNotificationEvent), + BlockChan: make(chan *block.Block), + NotaryChan: make(chan *result.NotaryRequestEvent), + } +} + +func (s *subChannels) drain() { +drainloop: + for { + select { + case _, ok := <-s.NotifyChan: + if !ok { + s.NotifyChan = nil + } + case _, ok := <-s.BlockChan: + if !ok { + s.BlockChan = nil + } + case _, ok := <-s.NotaryChan: + if !ok { + s.NotaryChan = nil + } + default: + break drainloop + } + } +} + +// restoreSubscriptions restores subscriptions according to +// cached information about them. +func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent, + blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool { + var err error + + // new block events restoration + if s.subscribedToNewBlocks { + _, err = s.client.ReceiveBlocks(blCh) + if err != nil { + s.log.Error("could not restore block subscription after RPC switch", zap.Error(err)) + return false + } + } + + // notification events restoration + for contract := range s.subscribedEvents { + contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 + _, err = s.client.ReceiveExecutionNotifications(contract, notifCh) + if err != nil { + s.log.Error("could not restore notification subscription after RPC switch", zap.Error(err)) + return false + } + } + + // notary notification events restoration + for signer := range s.subscribedNotaryEvents { + signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 + _, err = s.client.ReceiveNotaryRequests(signer, notaryCh) + if err != nil { + s.log.Error("could not restore notary notification subscription after RPC switch", zap.Error(err)) + return false + } + } + return true +} + // awaitHeight checks if remote client has least expected block height and // returns error if it is not reached that height after timeout duration. // This function is required to avoid connections to unsynced RPC nodes, because diff --git a/pkg/services/object/put/pool.go b/pkg/services/object/put/pool.go index 5726856e5..ebe214caf 100644 --- a/pkg/services/object/put/pool.go +++ b/pkg/services/object/put/pool.go @@ -4,7 +4,10 @@ import ( "sync" ) -const defaultAllocSize = 1024 +const ( + defaultAllocSize = 1024 + poolSliceMaxSize = 128 * 1024 +) type payload struct { Data []byte @@ -19,6 +22,9 @@ func getPayload() *payload { } func putPayload(p *payload) { + if cap(p.Data) > poolSliceMaxSize { + return + } p.Data = p.Data[:0] putBytesPool.Put(p) } diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index aea5926f4..5c305df3b 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm { return p } +func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm { + if p != nil { + p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v)) + } + + return p +} + func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { if p != nil { p.relay = f diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go index 790f061f1..d0fa6f88d 100644 --- a/pkg/services/object/put/v2/util.go +++ b/pkg/services/object/put/v2/util.go @@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put object.NewFromV2(oV2), ). WithRelay(s.relayRequest). - WithCommonPrm(commonPrm), nil + WithCommonPrm(commonPrm). + WithCopyNumber(part.GetCopiesNumber()), nil } func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm { diff --git a/pkg/services/tree/cache.go b/pkg/services/tree/cache.go index ab9f509ac..3288083cc 100644 --- a/pkg/services/tree/cache.go +++ b/pkg/services/tree/cache.go @@ -27,7 +27,7 @@ type cacheItem struct { } const ( - defaultClientCacheSize = 10 + defaultClientCacheSize = 32 defaultClientConnectTimeout = time.Second * 2 defaultReconnectInterval = time.Second * 15 ) @@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed") func (c *clientCache) init() { l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) { - _ = value.cc.Close() + if conn := value.cc; conn != nil { + _ = conn.Close() + } }) c.LRU = *l } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index edea450f1..acce3f1e7 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -13,6 +13,7 @@ import ( cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/panjf2000/ants/v2" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -31,6 +32,8 @@ type Service struct { syncChan chan struct{} syncPool *ants.Pool + initialSyncDone atomic.Bool + // cnrMap contains existing (used) container IDs. cnrMap map[cidSDK.ID]struct{} // cnrMapMtx protects cnrMap @@ -89,6 +92,10 @@ func (s *Service) Shutdown() { } func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error } func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP } func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon // Move applies client operation to the specified tree and pushes in queue // for replication on other nodes. func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er } func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) } func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { + if !s.initialSyncDone.Load() { + return ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { + if !s.initialSyncDone.Load() { + return ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) } h := b.GetHeight() + lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId()) + if err != nil { + return err + } for { lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h) - if err != nil || lm.Time == 0 { + if err != nil || lm.Time == 0 || lastHeight < lm.Time { return err } @@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) } func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + var cid cidSDK.ID err := cid.Decode(req.GetBody().GetContainerId()) @@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI } func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + return new(HealthcheckResponse), nil } diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 47299d1c9..81e4a8e44 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -203,24 +203,29 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) + errG, ctx := errgroup.WithContext(ctx) + errG.SetLimit(1024) + for { - newHeight := height req := &GetOpLogRequest{ Body: &GetOpLogRequest_Body{ ContainerId: rawCID, TreeId: treeID, - Height: newHeight, + Height: height, }, } if err := SignMessage(req, s.key); err != nil { - return newHeight, err + _ = errG.Wait() + return height, err } c, err := treeClient.GetOpLog(ctx, req) if err != nil { - return newHeight, fmt.Errorf("can't initialize client: %w", err) + _ = errG.Wait() + return height, fmt.Errorf("can't initialize client: %w", err) } + lastApplied := height res, err := c.Recv() for ; err == nil; res, err = c.Recv() { lm := res.GetBody().GetOperation() @@ -229,21 +234,28 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri Child: lm.ChildId, } if err := m.Meta.FromBytes(lm.Meta); err != nil { - return newHeight, err + _ = errG.Wait() + return height, err } - if err := s.forest.TreeApply(cid, treeID, m, true); err != nil { - return newHeight, err - } - if m.Time > newHeight { - newHeight = m.Time + 1 - } else { - newHeight++ + if lastApplied < m.Meta.Time { + lastApplied = m.Meta.Time } + errG.Go(func() error { + return s.forest.TreeApply(cid, treeID, m, true) + }) } - if height == newHeight || err != nil && !errors.Is(err, io.EOF) { - return newHeight, err + + // First check local errors: if everything is ok, we can update starting height, + // because everything was applied. + applyErr := errG.Wait() + if applyErr != nil { + return height, applyErr + } + + height = lastApplied + if err != nil && !errors.Is(err, io.EOF) { + return height, err } - height = newHeight } } @@ -288,7 +300,7 @@ func (s *Service) syncLoop(ctx context.Context) { cnrs, err := s.cfg.cnrSource.List() if err != nil { s.log.Error("could not fetch containers", zap.Error(err)) - continue + break } newMap, cnrsToSync := s.containersToSync(cnrs) @@ -299,6 +311,7 @@ func (s *Service) syncLoop(ctx context.Context) { s.log.Debug("trees have been synchronized") } + s.initialSyncDone.Store(true) } }