engine: Evacuate object from shards concurrently #1356
No reviewers
Labels
No labels
P0
P1
P2
P3
badger
frostfs-adm
frostfs-cli
frostfs-ir
frostfs-lens
frostfs-node
good first issue
triage
Infrastructure
blocked
bug
config
discussion
documentation
duplicate
enhancement
go
help wanted
internal
invalid
kludge
observability
perfomance
question
refactoring
wontfix
No milestone
No project
No assignees
4 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference: TrueCloudLab/frostfs-node#1356
Loading…
Reference in a new issue
No description provided.
Delete branch "acid-ant/frostfs-node:feat/evac-parallel"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Introduced two API and config parameters for evacuation -
container-worker-count
andobject-worker-count
.All shards for evacuation will be processed concurrently.
Operation duration before changes for object size 81920 b:
Operation duration after changes:
Signed-off-by: Anton Nikiforov an.nikiforov@yadro.com
df2c5a42c4
to15fba7a29c
@ -0,0 +2,4 @@
import "context"
type routineLimiter struct {
Just for information, this will be confilcting with this PR change anyway :)
Oh, thanks. I'll rebase on master once it will be merged.
Rebased.
@ -288,2 +287,4 @@
}
limiter := newRoutineLimiter(e.cfg.evacuationWorkers)
eg, egCtx := errgroup.WithContext(ctx)
Use
eg.SetLimit()
instead of limiter: it does the same thing as limiter, but looks shorter.Thanks, updated to use
eg.SetLimit()
.15fba7a29c
toaa21a71b61
@ -91,0 +100,4 @@
// EngineEvacuationWorkersCount returns value of "evacuation_workers_count" config parmeter from "storage" section.
func EngineEvacuationWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_workers_count"); v > 0 {
worker_count
(we already have 4 such parameters), forworkers_count
only rebuild, will fix eventually.@ -220,2 +224,4 @@
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
evacuationWorkers: 5,
Please, adjust the
docs/storage-node-configuration.md
.Also, it is a bit difficult to comprehend what is the difference between these 2 types of
worker_count
.I don't see myself ever being able to make an informed decision about this number -- it is either parallel or not.
If there are other opinions, I would like to hear :)
@acid-ant Please, take a look at the data-race.
aa21a71b61
to5daff28e69
New commits pushed, approval review dismissed automatically according to repository settings
Add new method
ListConcurrently
tometabase
API which allow iterating over it concurrently: creates routines to iterate over containers and objects concurrently.engine: Evacuate object from shards concurrentlyto WIP: engine: Evacuate object from shards concurrently5daff28e69
to0613c54bee
Fix evacuation tests.
WIP: engine: Evacuate object from shards concurrentlyto engine: Evacuate object from shards concurrentlyacid-ant referenced this pull request2024-09-18 12:05:11 +00:00
REP 1
only #13500613c54bee
toaf6139b981
Minor changes.
@ -294,2 +291,2 @@
return err
}
eg.Go(func() error {
if err = e.evacuateShard(egCtx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
Let's consider
shard1
andshard2
. Shouldshard1
's evacuation stop ifshard2
's evacuation has been failed? If it shouldn't, thenegCtx
can't be used as it cancells other jobs tooGood question. I think process should stop faster. To indicate that we have a problematic situation with disks. That will allow us to rerun evacuation and exclude defected disk.
OK with me
engine: Evacuate object from shards concurrentlyto WIP: engine: Evacuate object from shards concurrentlyaf6139b981
toeada9bd28b
New commits pushed, approval review dismissed automatically according to repository settings
eada9bd28b
to0a13e7d946
0a13e7d946
tofafee24650
fafee24650
to51e64c3101
WIP: engine: Evacuate object from shards concurrentlyto engine: Evacuate object from shards concurrently4b52a38812
to620793008d
@ -21,6 +21,9 @@ const (
noProgressFlag = "no-progress"
scopeFlag = "scope"
containerWorkerCountFlag = "container-worker-count"
I have doubts that this parameter is necessary. Can you test with a value of 1 and a value of 10, for example, for 100 containers?
It will be helpful for case when we need to evacuate containers with policy
REP 1
only.In a bad case, we need ~100ms to get info about container from
neo-go
.For 1_000 containers, we need around 1m to iterate over containers with
container-worker-count
= 1.For 1_000 containers, we need around 10s to iterate over containers with
container-worker-count
= 10.For 10_000 containers, we need around 16m to iterate over containers with
container-worker-count
= 1.For 10_000 containers, we need around 1m40s to iterate over containers with
container-worker-count
= 10.Measured with unit test here.
@ -16,2 +16,4 @@
// process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20
// EvacuationShardWorkerCountDefault is a default value of the count of shards
// evacuees concurrently.
Fix typo
@ -176,0 +171,4 @@
|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers. |
Adding these parameters to the configuration looks redundant. Let's add the default value to the
frostfs-cli
, and on the server check that the value passed in the RPC call is greater than zero?I disagree. With this approach, you need to configure once how you want to utilize resources for evacuation, instead of remember parameters for evacuation.
Then remove the same parameters from the RPC call? It looks strange if the service administrator has configured these parameters in the configuration file, and then he also makes a disk evacuation call, but with different parameters.
I think it is more flexible to have two ways to configure evacuation parameters instead of being tied to only one.
But this leads to a complication of the code and understanding. The evacuation is called by the service engineer or the server administrator. The configuration is also changed by the service engineer or the server administrator. The only place where similar behavior occurs is when changing the shard mode: this can be done through a
frostfs-cli
call or through a configuration change. But this was necessary for the correct disk replacement procedure, and not dictated by concern for convenience. By the way, as far as I remember, we have been trying to avoid such duplication for a very long time.Anyway, if you're sure it's necessary, I'll accept it.
Exactly, I also remember such activity by reducing the amount of settings available for the service engineer and server administrator. This beautiful morning, I'm ready to remove redundant settings from the changes.
@ -287,12 +296,44 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err
}
egShard, _ := errgroup.WithContext(ctx)
Previously, if the evacuation of one shard failed, the evacuation of the next shard was not started. Now this behavior has changed
It is necessary either to clearly justify why this behavior has changed, or to do as it was before (use ctx).
I don't see the reason why we need to keep order for shards - the amount of objects and containers per shard is unpredictable. If we start to process all shard at the same time, that will help us to better utilize node resources. The corner case here is shard with one container with policy
REP 1
.As for operation cancelation. The processing of other shards will fail too, because once we will fail on object evacuation, then it will be impossible, to run new routine for object evacuation for another container, also it will be impossible to process next container and that will stop the shard evacuation.
Could you explain:
it will be impossible, to run new routine for object evacuation for another container
Where is such check?
Shame on me, I've taken a look closer at
errorgroup.Go()
and now I see my fault. Updated a bit to use one context for severalerror groups
. Also added a unit test to check cancelation by error.620793008d
tob3e9139629
b3e9139629
tocf588ad386
cf588ad386
to900b3b7ca1
Removed redundant parameters from
node
config.@ -262,0 +317,4 @@
continue
}
bkt := tx.Bucket(name)
To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also
bkt := tx.Bucket(name)
make another one cursor seek inside, so this looks redundant.Thanks, fixed. That was copy-paste from
listWithCursor
.@ -167,0 +187,4 @@
))
defer span.End()
if s.GetMode().NoMetabase() {
It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something?
You are right,
RLock
here and inIterateOverObjectsInContainer
is required. Fixed.900b3b7ca1
to53b810bec4
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
maybe set default values here, but not on the server side?
No, that will lead to resource leak on a server side - when another client may set it zero values.
On server side just check that those values are greater than zero.
@ -312,0 +356,4 @@
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
) {
operationCtx, cancel := context.WithCancelCause(ctx)
It is ok, but looks redundant.
This code will do almost the same:
eg.Wait()
cancels context. Routines which process containers and shards will end earlier than routines for objects. That is why they can't depend on context of each other.53b810bec4
tocc967c62c7
@ -64,0 +69,4 @@
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name
The sentence must end with a period
@ -262,0 +313,4 @@
if cidRaw == nil {
continue
}
if prefix != primaryPrefix && prefix != lockersPrefix && prefix != tombstonePrefix {
Replace with
to reduce seeks
Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once.
Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others.
After fix iteration will be performed only on particular prefixes:
name != nil && bytes.HasPrefix(name, prefix);
Gotcha, fixed.
@ -262,0 +367,4 @@
k, v := c.First()
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
Could be done before transaction start
Done.
@ -262,0 +360,4 @@
}
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
bkt := tx.Bucket(prm.BucketName)
Add check against nil
Added.
cc967c62c7
toc8eae4ace6
New commits pushed, approval review dismissed automatically according to repository settings
c8eae4ace6
to34e6a309c6