Pilorama migration #960

Merged
fyrchik merged 8 commits from dstepanov-yadro/frostfs-node:feat/pilorama_migrate into master 2024-09-04 19:51:06 +00:00
20 changed files with 1973 additions and 877 deletions

View file

@ -19,6 +19,11 @@ import (
const (
awaitFlag = "await"
noProgressFlag = "no-progress"
scopeFlag = "scope"
scopeAll = "all"
scopeObjects = "objects"
scopeTrees = "trees"
)
var evacuationShardCmd = &cobra.Command{
@ -57,6 +62,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
},
}
@ -82,6 +88,22 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
}
}
func getEvacuationScope(cmd *cobra.Command) uint32 {
rawScope, err := cmd.Flags().GetString(scopeFlag)
commonCmd.ExitOnErr(cmd, "Invalid scope value: %w", err)
switch rawScope {
case scopeAll:
return uint32(control.StartShardEvacuationRequest_Body_OBJECTS) | uint32(control.StartShardEvacuationRequest_Body_TREES)
case scopeObjects:
return uint32(control.StartShardEvacuationRequest_Body_OBJECTS)
case scopeTrees:
return uint32(control.StartShardEvacuationRequest_Body_TREES)
default:
commonCmd.ExitOnErr(cmd, "Invalid scope value: %w", fmt.Errorf("unknown scope %s", rawScope))
}
return uint32(control.StartShardEvacuationRequest_Body_NONE)
}
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.GetShardEvacuationStatusRequest{
@ -219,15 +241,17 @@ func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusRespo
func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING ||
resp.GetBody().GetDuration() == nil ||
resp.GetBody().GetTotal() == 0 ||
resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed()+resp.GetBody().GetSkipped() == 0 {
(resp.GetBody().GetTotalObjects() == 0 && resp.GetBody().GetTotalTrees() == 0) ||
(resp.GetBody().GetEvacuatedObjects()+resp.GetBody().GetFailedObjects()+resp.GetBody().GetSkippedObjects() == 0 &&
resp.GetBody().GetEvacuatedTrees()+resp.GetBody().GetFailedTrees() == 0) {
return
}
durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds())
evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed() + resp.GetBody().GetSkipped())
evacuated := float64(resp.GetBody().GetEvacuatedObjects() + resp.GetBody().GetFailedObjects() + resp.GetBody().GetSkippedObjects() +
resp.GetBody().GetEvacuatedTrees() + resp.GetBody().GetFailedTrees())
avgObjEvacuationTimeSeconds := durationSeconds / evacuated
objectsLeft := float64(resp.GetBody().GetTotal()) - evacuated
objectsLeft := float64(resp.GetBody().GetTotalObjects()+resp.GetBody().GetTotalTrees()) - evacuated
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
leftMinutes := int(leftSeconds / 60)
@ -285,11 +309,14 @@ func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusR
}
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d.",
resp.GetBody().GetEvacuated(),
resp.GetBody().GetTotal(),
resp.GetBody().GetFailed(),
resp.GetBody().GetSkipped()))
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d; evacuated %d trees out of %d, failed to evacuate: %d.",
fyrchik marked this conversation as resolved Outdated

What's the benefit in outputting both objects in trees if only 1 evacuation is being used (the numbers for other will be 0, I guess)

What's the benefit in outputting both objects in trees if only 1 evacuation is being used (the numbers for other will be 0, I guess)

To have unified output: it could be parsed by tests for example.

To have unified output: it could be parsed by tests for example.
resp.GetBody().GetEvacuatedObjects(),
resp.GetBody().GetTotalObjects(),
resp.GetBody().GetFailedObjects(),
resp.GetBody().GetSkippedObjects(),
resp.GetBody().GetEvacuatedTrees(),
resp.GetBody().GetTotalTrees(),
resp.GetBody().GetFailedTrees()))
}
func initControlEvacuationShardCmd() {
@ -309,6 +336,7 @@ func initControlStartEvacuationShardCmd() {
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards")
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))

View file

@ -9,23 +9,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const serviceNameControl = "control"
type treeSynchronizer struct {
treeSvc *tree.Service
}
func (t treeSynchronizer) Synchronize(ctx context.Context, cnr cid.ID, treeID string) error {
return t.treeSvc.SynchronizeTree(ctx, cnr, treeID)
}
func initControlService(c *cfg) {
endpoint := controlconfig.GRPC(c.appCfg).Endpoint()
if endpoint == controlconfig.GRPCEndpointDefault {
@ -50,9 +40,7 @@ func initControlService(c *cfg) {
controlSvc.WithReplicator(c.replicator),
controlSvc.WithNodeState(c),
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
controlSvc.WithTreeService(treeSynchronizer{
c.treeService,
}),
controlSvc.WithTreeService(c.treeService),
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
)

View file

@ -14,7 +14,7 @@ Only one running evacuation process is allowed on the node at a time.
## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
`frostfs-cli control shards evacuation stop` stops running evacuation process.
@ -39,15 +39,15 @@ Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 14 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03. Estimated time left: 2 minutes.
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03. Estimated time left: 2 minutes.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 23 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:01:05. Estimated time left: 1 minutes.
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 260 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:01:05. Estimated time left: 1 minutes.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 61 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:02:13.
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 618 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 19 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:02:13.
```
### Stop running evacuation process
@ -58,7 +58,7 @@ Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 15 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03. Estimated time left: 0 minutes.
frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
@ -66,7 +66,7 @@ Evacuation stopped.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 31 object out of 73, failed to evacuate 0 objects. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
```
### Start evacuation and await it completes
@ -75,11 +75,11 @@ frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 -
Enter password >
Shard evacuation has been successfully started.
Progress will be reported every 5 seconds.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 18 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 43 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 68 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 343 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 545 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14. Estimated time left: 0 minutes.
Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 618 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 19 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
```
### Start evacuation and await it completes without progress notifications
@ -88,5 +88,15 @@ frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 -
Enter password >
Shard evacuation has been successfully started.
Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 618 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 19 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
```
### Start trees evacuation and await it completes
```bash
frostfs-cli control shards evacuation start --id FxR6QujButNCHn7jjdhxGP --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --await --scope trees
Enter password >
Shard evacuation has been successfully started.
Progress will be reported every 5 seconds.
Shard evacuation has been completed.
Shard IDs: FxR6QujButNCHn7jjdhxGP. Evacuated 0 objects out of 0, failed to evacuate: 0, skipped: 0; evacuated 2 trees out of 2, failed to evacuate: 0. Started at: 2024-02-08T08:44:17Z UTC. Duration: 00:00:00.
```

View file

@ -575,4 +575,7 @@ const (
GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node"
GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes"
FailedToUpdateShardID = "failed to update shard id"
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
)

View file

@ -4,16 +4,20 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
@ -28,83 +32,135 @@ var (
evacuationOperationLogField = zap.String("operation", "evacuation")
)
// EvacuateScope is an evacuation scope. Keep in sync with pkg/services/control/service.proto.
type EvacuateScope uint32
var (
EvacuateScopeObjects EvacuateScope = 1
EvacuateScopeTrees EvacuateScope = 2
)
func (s EvacuateScope) String() string {
var sb strings.Builder
first := true
if s&EvacuateScopeObjects == EvacuateScopeObjects {
if !first {
sb.WriteString(";")
}
sb.WriteString("objects")
first = false
}
if s&EvacuateScopeTrees == EvacuateScopeTrees {
if !first {
sb.WriteString(";")
}
sb.WriteString("trees")
}
return sb.String()
}
func (s EvacuateScope) WithObjects() bool {
return s&EvacuateScopeObjects == EvacuateScopeObjects
}
func (s EvacuateScope) WithTrees() bool {
return s&EvacuateScopeTrees == EvacuateScopeTrees
}
func (s EvacuateScope) TreesOnly() bool {
return s == EvacuateScopeTrees
}
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
shardID []*shard.ID
handler func(context.Context, oid.Address, *objectSDK.Object) error
ignoreErrors bool
async bool
ShardID []*shard.ID
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error)
IgnoreErrors bool
Async bool
Scope EvacuateScope
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
evacuated *atomic.Uint64
total *atomic.Uint64
failed *atomic.Uint64
skipped *atomic.Uint64
objEvacuated *atomic.Uint64
objTotal *atomic.Uint64
objFailed *atomic.Uint64
objSkipped *atomic.Uint64
trEvacuated *atomic.Uint64
trTotal *atomic.Uint64
trFailed *atomic.Uint64
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
evacuated: new(atomic.Uint64),
total: new(atomic.Uint64),
failed: new(atomic.Uint64),
skipped: new(atomic.Uint64),
objEvacuated: new(atomic.Uint64),
objTotal: new(atomic.Uint64),
objFailed: new(atomic.Uint64),
objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
}
}
// WithShardIDList sets shard ID.
func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) {
p.shardID = id
}
// WithIgnoreErrors sets flag to ignore errors.
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, *objectSDK.Object) error) {
p.handler = f
}
// WithAsync sets flag to run evacuate async.
func (p *EvacuateShardPrm) WithAsync(async bool) {
p.async = async
}
// Evacuated returns amount of evacuated objects.
// ObjectsEvacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p *EvacuateShardRes) Evacuated() uint64 {
func (p *EvacuateShardRes) ObjectsEvacuated() uint64 {
if p == nil {
return 0
}
return p.evacuated.Load()
return p.objEvacuated.Load()
}
// Total returns total count objects to evacuate.
func (p *EvacuateShardRes) Total() uint64 {
// ObjectsTotal returns total count objects to evacuate.
func (p *EvacuateShardRes) ObjectsTotal() uint64 {
if p == nil {
return 0
}
return p.total.Load()
return p.objTotal.Load()
}
// Failed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) Failed() uint64 {
// ObjectsFailed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) ObjectsFailed() uint64 {
if p == nil {
return 0
}
return p.failed.Load()
return p.objFailed.Load()
}
// Skipped returns count of skipped objects.
func (p *EvacuateShardRes) Skipped() uint64 {
// ObjectsSkipped returns count of skipped objects.
func (p *EvacuateShardRes) ObjectsSkipped() uint64 {
if p == nil {
return 0
}
return p.skipped.Load()
return p.objSkipped.Load()
}
// TreesEvacuated returns amount of evacuated trees.
func (p *EvacuateShardRes) TreesEvacuated() uint64 {
if p == nil {
return 0
}
return p.trEvacuated.Load()
}
// TreesTotal returns total count trees to evacuate.
func (p *EvacuateShardRes) TreesTotal() uint64 {
if p == nil {
return 0
}
return p.trTotal.Load()
}
// TreesFailed returns count of failed trees to evacuate.
func (p *EvacuateShardRes) TreesFailed() uint64 {
if p == nil {
return 0
}
return p.trFailed.Load()
}
// DeepCopy returns deep copy of result instance.
@ -114,16 +170,22 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
}
res := &EvacuateShardRes{
evacuated: new(atomic.Uint64),
total: new(atomic.Uint64),
failed: new(atomic.Uint64),
skipped: new(atomic.Uint64),
objEvacuated: new(atomic.Uint64),
objTotal: new(atomic.Uint64),
objFailed: new(atomic.Uint64),
objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
}
res.evacuated.Store(p.evacuated.Load())
res.total.Store(p.total.Load())
res.failed.Store(p.failed.Load())
res.skipped.Store(p.skipped.Load())
res.objEvacuated.Store(p.objEvacuated.Load())
res.objTotal.Store(p.objTotal.Load())
res.objFailed.Store(p.objFailed.Load())
res.objSkipped.Store(p.objSkipped.Load())
res.trTotal.Store(p.trTotal.Load())
res.trEvacuated.Store(p.trEvacuated.Load())
res.trFailed.Store(p.trFailed.Load())
return res
}
@ -145,20 +207,21 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
default:
}
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
shardIDs := make([]string, len(prm.ShardID))
for i := range prm.ShardID {
shardIDs[i] = prm.ShardID[i].String()
}
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
))
defer span.End()
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
shards, weights, err := e.getActualShards(shardIDs, prm)
if err != nil {
return nil, err
}
@ -173,7 +236,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
}
res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.async)
ctx = ctxOrBackground(ctx, prm.Async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return nil, err
@ -183,7 +246,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
})
if prm.async {
if prm.Async {
return nil, nil
}
@ -204,8 +267,9 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
))
defer func() {
@ -214,18 +278,19 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
}()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
err = e.getTotals(ctx, prm, shardsToEvacuate, res)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField)
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
}
@ -233,19 +298,23 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs),
evacuationOperationLogField,
zap.Uint64("total", res.Total()),
zap.Uint64("evacuated", res.Evacuated()),
zap.Uint64("failed", res.Failed()),
zap.Uint64("skipped", res.Skipped()),
zap.Uint64("total_objects", res.ObjectsTotal()),
zap.Uint64("evacuated_objects", res.ObjectsEvacuated()),
zap.Uint64("failed_objects", res.ObjectsFailed()),
zap.Uint64("skipped_objects", res.ObjectsSkipped()),
zap.Uint64("total_trees", res.TreesTotal()),
zap.Uint64("evacuated_trees", res.TreesEvacuated()),
zap.Uint64("failed_trees", res.TreesFailed()),
)
return nil
}
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End()
for _, sh := range shardsToEvacuate {
if prm.Scope.WithObjects() {
cnt, err := sh.LogicalObjectsCount(ctx)
if err != nil {
if errors.Is(err, shard.ErrDegradedMode) {
@ -253,7 +322,15 @@ func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacua
}
return err
}
res.total.Add(cnt)
res.objTotal.Add(cnt)
}
if prm.Scope.WithTrees() && sh.PiloramaEnabled() {
cnt, err := pilorama.TreeCountAll(ctx, sh)
if err != nil {
return err
}
res.trTotal.Add(cnt)
}
}
return nil
}
@ -267,6 +344,24 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
))
defer span.End()
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
@ -297,7 +392,198 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) {
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
var listPrm pilorama.TreeListTreesPrm
first := true
for len(listPrm.NextPageToken) > 0 || first {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
first = false
listRes, err := sh.TreeListTrees(ctx, listPrm)
if err != nil {
return err
}
listPrm.NextPageToken = listRes.NextPageToken
if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64,
shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
trace.WithAttributes(
attribute.Int("trees_count", len(trees)),
))
defer span.End()
for _, contTree := range trees {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate)
if err != nil {
return err
}
if success {
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedLocal,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
fyrchik marked this conversation as resolved Outdated

We use shard_id in labels usually, why is it shardID here?

We use `shard_id` in labels usually, why is it `shardID` here?

fixed

fixed
zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
continue
}
nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
}
return nil
}
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (string, error) {
if prm.TreeHandler == nil {
return "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
}
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
}
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) (bool, string, error) {
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, weights, shardsToEvacuate)
if err != nil {
fyrchik marked this conversation as resolved Outdated

I believe !prm.IgnoreErrors is not needed here: we ignore errors of evacuation, but if we haven't found a suitable shard, there is nothing we can do below, right?

I believe `!prm.IgnoreErrors` is not needed here: we ignore errors of evacuation, but if we haven't found a suitable shard, there is nothing we can do below, right?

right, fixed

right, fixed
return false, "", err
}
if !found {
return false, "", nil
}
const readBatchSize = 1000
source := make(chan *pilorama.Move, readBatchSize)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
var applyErr error
go func() {
defer wg.Done()
applyErr = target.TreeApplyStream(ctx, tree.CID, tree.TreeID, source)
if applyErr != nil {
cancel()
}
}()
var height uint64
for {
op, err := sh.TreeGetOpLog(ctx, tree.CID, tree.TreeID, height)
if err != nil {
cancel()
wg.Wait()
close(source) // close after cancel to ctx.Done() hits first
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", err
}
if op.Time == 0 { // completed get op log
close(source)
wg.Wait()
if applyErr == nil {
return true, target.ID().String(), nil
}
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", applyErr
}
select {
case <-ctx.Done(): // apply stream failed or operation cancelled
wg.Wait()
if prm.IgnoreErrors {
return false, "", nil
}
if applyErr != nil {
return false, "", applyErr
}
return false, "", ctx.Err()
case source <- &op:
}
height = op.Time + 1
}
}
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) (pooledShard, bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(tree.CID.EncodeToString()))
var result pooledShard
var found bool
for _, target := range shards {
select {
case <-ctx.Done():
return pooledShard{}, false, ctx.Err()
default:
}
if _, ok := shardsToEvacuate[target.ID().String()]; ok {
continue
}
if !target.PiloramaEnabled() || target.GetMode().ReadOnly() {
continue
}
if !found {
result = target
found = true
}
exists, err := target.TreeExists(ctx, tree.CID, tree.TreeID)
if err != nil {
continue
}
if exists {
return target, true, nil
}
}
return result, found, nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) {
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -310,9 +596,17 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
if !sh.GetMode().ReadOnly() {
return nil, nil, ErrMustBeReadOnly
}
if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() {
return nil, nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID())
}
}
if len(e.shards)-len(shardIDs) < 1 && !handlerDefined {
if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() {
return nil, nil, errMustHaveTwoShards
}
if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() {
return nil, nil, errMustHaveTwoShards
}
@ -357,8 +651,8 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
res.failed.Add(1)
if prm.IgnoreErrors {
res.objFailed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
@ -375,19 +669,19 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
continue
}
if prm.handler == nil {
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
}
err = prm.handler(ctx, addr, getRes.Object())
err = prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
res.evacuated.Add(1)
res.objEvacuated.Add(1)
}
return nil
}
@ -408,7 +702,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
}
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
case putToShardSuccess:
res.evacuated.Add(1)
res.objEvacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
@ -417,7 +711,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return true, nil
case putToShardExists, putToShardRemoved:
res.skipped.Add(1)
res.objSkipped.Add(1)
return true, nil
default:
continue

View file

@ -34,32 +34,53 @@ func (s *EvacuationState) ShardIDs() []string {
return s.shardIDs
}
func (s *EvacuationState) Evacuated() uint64 {
func (s *EvacuationState) ObjectsEvacuated() uint64 {
if s == nil {
return 0
}
return s.result.Evacuated()
return s.result.ObjectsEvacuated()
}
func (s *EvacuationState) Total() uint64 {
func (s *EvacuationState) ObjectsTotal() uint64 {
if s == nil {
return 0
}
return s.result.Total()
return s.result.ObjectsTotal()
}
func (s *EvacuationState) Failed() uint64 {
func (s *EvacuationState) ObjectsFailed() uint64 {
if s == nil {
return 0
}
return s.result.Failed()
return s.result.ObjectsFailed()
}
func (s *EvacuationState) Skipped() uint64 {
func (s *EvacuationState) ObjectsSkipped() uint64 {
if s == nil {
return 0
}
return s.result.Skipped()
return s.result.ObjectsSkipped()
}
func (s *EvacuationState) TreesEvacuated() uint64 {
if s == nil {
return 0
}
return s.result.TreesEvacuated()
}
func (s *EvacuationState) TreesTotal() uint64 {
if s == nil {
return 0
}
return s.result.TreesTotal()
}
func (s *EvacuationState) TreesFailed() uint64 {
if s == nil {
return 0
}
return s.result.TreesFailed()
}
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {

View file

@ -14,9 +14,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -41,6 +43,10 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0o700),
meta.WithEpochState(epochState{})),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
pilorama.WithPerm(0o700),
),
}
})
e, ids := te.engine, te.shardIDs
@ -48,36 +54,32 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
require.NoError(t, e.Init(context.Background()))
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
treeID := "version"
meta := []pilorama.KeyValue{
{Key: pilorama.AttributeVersion, Value: []byte("XXX")},
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
}
for _, sh := range ids {
obj := testutil.GenerateObjectWithCID(cidtest.ID())
for i := 0; i < objPerShard; i++ {
contID := cidtest.ID()
obj := testutil.GenerateObjectWithCID(contID)
objects = append(objects, obj)
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
for i := 0; ; i++ {
objects = append(objects, testutil.GenerateObjectWithCID(cidtest.ID()))
var putPrm PutPrm
putPrm.WithObject(objects[len(objects)-1])
err := e.Put(context.Background(), putPrm)
_, err = e.shards[sh.String()].TreeAddByPath(context.Background(), pilorama.CIDDescriptor{CID: contID, Position: 0, Size: 1},
treeID, pilorama.AttributeFilename, []string{"path", "to", "the", "file"}, meta)
require.NoError(t, err)
res, err := e.shards[ids[len(ids)-1].String()].List(context.Background())
require.NoError(t, err)
if len(res.AddressList()) == objPerShard {
break
}
}
return e, ids, objects
}
func TestEvacuateShard(t *testing.T) {
func TestEvacuateShardObjects(t *testing.T) {
t.Parallel()
const objPerShard = 3
@ -102,19 +104,20 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t)
var prm EvacuateShardPrm
prm.WithShardIDList(ids[2:3])
prm.ShardID = ids[2:3]
prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.Evacuated())
require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
@ -125,7 +128,7 @@ func TestEvacuateShard(t *testing.T) {
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
checkHasObjects(t)
@ -137,7 +140,7 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t)
}
func TestEvacuateNetwork(t *testing.T) {
func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel()
errReplication := errors.New("handler error")
@ -173,17 +176,18 @@ func TestEvacuateNetwork(t *testing.T) {
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[0:1]
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.handler = acceptOneOf(objects, 2)
prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated())
require.Equal(t, uint64(2), res.ObjectsEvacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
@ -196,19 +200,20 @@ func TestEvacuateNetwork(t *testing.T) {
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = acceptOneOf(objects, 2)
prm.ShardID = ids[1:2]
prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated())
require.Equal(t, uint64(2), res.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3)
prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(3), res.Evacuated())
require.Equal(t, uint64(3), res.ObjectsEvacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -233,19 +238,20 @@ func TestEvacuateNetwork(t *testing.T) {
}
var prm EvacuateShardPrm
prm.shardID = evacuateIDs
prm.handler = acceptOneOf(objects, totalCount-1)
prm.ShardID = evacuateIDs
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Evacuated())
require.Equal(t, totalCount-1, res.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Evacuated())
require.Equal(t, totalCount, res.ObjectsEvacuated())
})
})
}
@ -261,8 +267,8 @@ func TestEvacuateCancellation(t *testing.T) {
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
prm.ShardID = ids[1:2]
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -270,13 +276,14 @@ func TestEvacuateCancellation(t *testing.T) {
}
return nil
}
prm.Scope = EvacuateScopeObjects
ctx, cancel := context.WithCancel(context.Background())
cancel()
res, err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}
func TestEvacuateSingleProcess(t *testing.T) {
@ -292,8 +299,9 @@ func TestEvacuateSingleProcess(t *testing.T) {
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
prm.ShardID = ids[1:2]
prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
@ -307,21 +315,21 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
close(blocker)
return nil
})
require.NoError(t, eg.Wait())
}
func TestEvacuateAsync(t *testing.T) {
func TestEvacuateObjectsAsync(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
require.NoError(t, e.Close(context.Background()))
@ -334,8 +342,9 @@ func TestEvacuateAsync(t *testing.T) {
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
prm.ShardID = ids[1:2]
prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
@ -348,7 +357,7 @@ func TestEvacuateAsync(t *testing.T) {
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
@ -358,7 +367,7 @@ func TestEvacuateAsync(t *testing.T) {
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil
})
@ -367,7 +376,7 @@ func TestEvacuateAsync(t *testing.T) {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get running state failed")
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid running count")
require.NotNil(t, st.StartedAt(), "invalid running started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
expectedShardIDs := make([]string, 0, 2)
@ -385,7 +394,7 @@ func TestEvacuateAsync(t *testing.T) {
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
@ -393,3 +402,166 @@ func TestEvacuateAsync(t *testing.T) {
require.NoError(t, eg.Wait())
}
func TestEvacuateTreesLocal(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeTrees
expectedShardIDs := make([]string, 0, 1)
for _, id := range ids[0:1] {
expectedShardIDs = append(expectedShardIDs, id.String())
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
fyrchik marked this conversation as resolved Outdated

Eventually will eventually fail on CI.
Though, it is sometimes inevitable, may we could provide a notify channel in parameters, which we close upon finish?

`Eventually` will eventually fail on CI. Though, it is sometimes inevitable, may we could provide a notify channel in parameters, which we `close` upon finish?

Changed test to sync version.

Changed test to sync version.
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[0].String()])
require.NoError(t, err, "list source trees failed")
require.Len(t, sourceTrees, 3)
for _, tr := range sourceTrees {
exists, err := e.shards[ids[1].String()].TreeExists(context.Background(), tr.CID, tr.TreeID)
require.NoError(t, err, "failed to check tree existance")
require.True(t, exists, "tree doesn't exists on target shard")
var height uint64
var sourceOps []pilorama.Move
for {
op, err := e.shards[ids[0].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
sourceOps = append(sourceOps, op)
height = op.Time + 1
}
height = 0
var targetOps []pilorama.Move
for {
op, err := e.shards[ids[1].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
targetOps = append(targetOps, op)
height = op.Time + 1
}
require.Equal(t, sourceOps, targetOps)
}
}
func TestEvacuateTreesRemote(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm
prm.ShardID = ids
prm.Scope = EvacuateScopeTrees
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (string, error) {
key := contID.String() + treeID
var height uint64
for {
op, err := f.TreeGetOpLog(ctx, contID, treeID, height)
require.NoError(t, err)
if op.Time == 0 {
return "", nil
}
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
height = op.Time + 1
}
}
expectedShardIDs := make([]string, 0, len(ids))
for _, id := range ids {
expectedShardIDs = append(expectedShardIDs, id.String())
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation must return not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
expectedTreeOps := make(map[string][]*pilorama.Move)
for i := 0; i < len(e.shards); i++ {
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[i].String()])
require.NoError(t, err, "list source trees failed")
require.Len(t, sourceTrees, 3)
for _, tr := range sourceTrees {
key := tr.CID.String() + tr.TreeID
var height uint64
for {
op, err := e.shards[ids[i].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
expectedTreeOps[key] = append(expectedTreeOps[key], &op)
height = op.Time + 1
}
}
}
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
}

View file

@ -549,6 +549,58 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
return metaerr.Wrap(err)
}
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyStream", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyStream",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
fullID := bucketName(cnr, treeID)
err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
var lm Move
if e := t.applyOperation(bLog, bTree, []*Move{m}, &lm); e != nil {
return e
}
}
}
}))
success = err == nil
return err
}
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
t.mtx.Lock()
for i := 0; i < len(t.batches); i++ {
@ -1134,6 +1186,68 @@ func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string)
return err
}
// TreeListTrees implements ForestStorage.
func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeListTrees", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeListTrees")
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
var res TreeListTreesResult

buckets can be filtered with a simple v != nil value (v is get from both Seek() and Next()))
prm.NextPageToken could be checked only on the first iteration

buckets can be filtered with a simple `v != nil` value (v is get from both `Seek()` and `Next()`)) `prm.NextPageToken` could be checked only on the first iteration

buckets can be filtered with a simple v != nil value

Cursor opened on tx, so it iterates only buckets.

prm.NextPageToken could be checked only on the first iteration

One more variable and one more if? Ok, fixed.

> buckets can be filtered with a simple `v != nil` value Cursor opened on `tx`, so it iterates only buckets. > `prm.NextPageToken` could be checked only on the first iteration One more variable and one more `if`? Ok, fixed.

One more variable and one more if? Ok, fixed.

I've thought about this if outside the loop because iteration is ordered and equality can happen only once.

>One more variable and one more if? Ok, fixed. I've thought about this if outside the loop because iteration is ordered and equality can happen only once.
err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
checkNextPageToken := true
for k, _ := c.Seek(prm.NextPageToken); k != nil; k, _ = c.Next() {
if bytes.Equal(k, dataBucket) || bytes.Equal(k, logBucket) {
continue
}
if checkNextPageToken && bytes.Equal(k, prm.NextPageToken) {
checkNextPageToken = false
continue
}
var contID cidSDK.ID
if err := contID.Decode(k[:32]); err != nil {
return fmt.Errorf("failed to decode containerID: %w", err)
}
res.Items = append(res.Items, ContainerIDTreeID{
CID: contID,
TreeID: string(k[32:]),
})
if len(res.Items) == batchSize {
res.NextPageToken = make([]byte, len(k))
copy(res.NextPageToken, k)
break
}
}
return nil
}))
success = err == nil
if err != nil {
return nil, err
}
return &res, nil
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
c := bTree.Cursor()

View file

@ -2,6 +2,7 @@ package pilorama
import (
"context"
"fmt"
"sort"
"strings"
@ -260,3 +261,77 @@ func (f *memoryForest) TreeLastSyncHeight(_ context.Context, cid cid.ID, treeID
}
return t.syncHeight, nil
}
// TreeListTrees implements Forest.
func (f *memoryForest) TreeListTrees(_ context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
tmpSlice := make([]string, 0, len(f.treeMap))
for k := range f.treeMap {
tmpSlice = append(tmpSlice, k)
}
sort.Strings(tmpSlice)
var idx int
if len(prm.NextPageToken) > 0 {
last := string(prm.NextPageToken)
idx, _ = sort.Find(len(tmpSlice), func(i int) int {
return -1 * strings.Compare(tmpSlice[i], last)
})
if idx == len(tmpSlice) {
return &TreeListTreesResult{}, nil
}
if tmpSlice[idx] == last {
idx++
}
}
var result TreeListTreesResult
for idx < len(tmpSlice) {
cidAndTree := strings.Split(tmpSlice[idx], "/")
if len(cidAndTree) != 2 {
return nil, fmt.Errorf("invalid format: key must be cid and treeID")
}
var contID cid.ID
if err := contID.DecodeString(cidAndTree[0]); err != nil {
return nil, fmt.Errorf("invalid format: %w", err)
}
result.Items = append(result.Items, ContainerIDTreeID{
CID: contID,
TreeID: cidAndTree[1],
})
if len(result.Items) == batchSize {
result.NextPageToken = []byte(tmpSlice[idx])
break
}
idx++
}
return &result, nil
}
// TreeApplyStream implements ForestStorage.
func (f *memoryForest) TreeApplyStream(ctx context.Context, cnr cid.ID, treeID string, source <-chan *Move) error {
fullID := cnr.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newMemoryTree()
f.treeMap[fullID] = s
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
if e := s.Apply(m); e != nil {
return e
}
}
}
}

View file

@ -1189,3 +1189,59 @@ func testTreeLastSyncHeight(t *testing.T, f ForestStorage) {
require.ErrorIs(t, err, ErrTreeNotFound)
})
}
func TestForest_ListTrees(t *testing.T) {
for i := range providers {
fyrchik marked this conversation as resolved Outdated

Please, no
We provide parameter struct in TreeListTrees, why not include batch size there?

Please, no We provide parameter _struct_ in `TreeListTrees`, why not include batch size there?

Done

Done
i := i
t.Run(providers[i].name, func(t *testing.T) {
testTreeListTrees(t, providers[i].construct)
})
}
}
func testTreeListTrees(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage) {
batchSize := 10
t.Run("empty", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 0)
})
t.Run("count lower than batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize-1)
})
t.Run("count equals batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize)
})
t.Run("count greater than batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize+1)
})
t.Run("count equals multiplied batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 3*batchSize)
})
t.Run("count equals multiplied batch size with addition", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 3*batchSize+batchSize/2)
})
}
func testTreeListTreesCount(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage, batchSize, count int) {
f := constructor(t)
var expected []ContainerIDTreeID
treeIDs := []string{"version", "system", "s", "avada kedavra"}
for i := 0; i < count; i++ {
cid := cidtest.ID()
treeID := treeIDs[i%len(treeIDs)]
expected = append(expected, ContainerIDTreeID{
CID: cid,
TreeID: treeID,
})
ops := prepareRandomTree(5, 5)
for _, op := range ops {
require.NoError(t, f.TreeApply(context.Background(), cid, treeID, &op, false))
}
}
actual, err := treeListAll(context.Background(), f, batchSize)
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}

View file

@ -63,6 +63,10 @@ type ForestStorage interface {
SetMode(m mode.Mode) error
SetParentID(id string)
Forest
// TreeListTrees returns all pairs "containerID:treeID".
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error
fyrchik marked this conversation as resolved Outdated

We should mention in comments that TreeApplyStream has specific use and can block other write tx for a pretty long time

We should mention in comments that `TreeApplyStream` has specific use and can block other write tx for a pretty long time

Done for boltdb implementation.

Done for boltdb implementation.
}
const (
@ -85,3 +89,68 @@ var ErrInvalidCIDDescriptor = logicerr.New("cid descriptor is invalid")
func (d CIDDescriptor) checkValid() bool {
return 0 <= d.Position && d.Position < d.Size
}
var treeListTreesBatchSizeDefault = 1000
type ContainerIDTreeID struct {
CID cidSDK.ID
TreeID string
}
type TreeListTreesPrm struct {
NextPageToken []byte
// BatchSize is batch size to list trees. If not lower or equals zero, than treeListTreesBatchSizeDefault is used.
BatchSize int
}
type TreeListTreesResult struct {
NextPageToken []byte
Items []ContainerIDTreeID
}
type treeList interface {
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
}
func TreeListAll(ctx context.Context, f treeList) ([]ContainerIDTreeID, error) {
return treeListAll(ctx, f, treeListTreesBatchSizeDefault)
}
func treeListAll(ctx context.Context, f treeList, batchSize int) ([]ContainerIDTreeID, error) {
var prm TreeListTreesPrm
prm.BatchSize = batchSize
var result []ContainerIDTreeID
first := true
for len(prm.NextPageToken) > 0 || first {
first = false
res, err := f.TreeListTrees(ctx, prm)
if err != nil {
return nil, err
}
prm.NextPageToken = res.NextPageToken
result = append(result, res.Items...)
}
return result, nil
}
func TreeCountAll(ctx context.Context, f treeList) (uint64, error) {
var prm TreeListTreesPrm
var result uint64
first := true
for len(prm.NextPageToken) > 0 || first {
first = false
res, err := f.TreeListTrees(ctx, prm)
if err != nil {
return 0, err
}
prm.NextPageToken = res.NextPageToken
result += uint64(len(res.Items))
}
return result, nil
}

View file

@ -353,3 +353,53 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
}
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
}
func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm) (*pilorama.TreeListTreesResult, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListTrees",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeListTrees(ctx, prm)
}
func (s *Shard) PiloramaEnabled() bool {
return s.pilorama != nil
}
func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyStream",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID)),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}

View file

@ -48,14 +48,17 @@ func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuation
return &control.GetShardEvacuationStatusResponse{
Body: &control.GetShardEvacuationStatusResponse_Body{
Shard_ID: shardIDs,
Evacuated: state.Evacuated(),
Total: state.Total(),
Failed: state.Failed(),
EvacuatedObjects: state.ObjectsEvacuated(),
TotalObjects: state.ObjectsTotal(),
FailedObjects: state.ObjectsFailed(),
Status: evacStatus,
StartedAt: startedAt,
Duration: duration,
ErrorMessage: state.ErrorMessage(),
Skipped: state.Skipped(),
SkippedObjects: state.ObjectsSkipped(),
TotalTrees: state.TreesTotal(),
EvacuatedTrees: state.TreesEvacuated(),
FailedTrees: state.TreesFailed(),
},
}, nil
}

View file

@ -4,13 +4,17 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -25,10 +29,12 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
return nil, status.Error(codes.PermissionDenied, err.Error())
}
var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
Scope: engine.EvacuateScopeObjects,
}
res, err := s.s.Evacuate(ctx, prm)
if err != nil {
@ -37,7 +43,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.Evacuated()),
Count: uint32(res.ObjectsEvacuated()),
},
}
@ -48,7 +54,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
return resp, nil
}
func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
cid, ok := obj.ContainerID()
if !ok {
// Return nil to prevent situations where a shard can't be evacuated
@ -56,33 +62,11 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
return nil
}
nm, err := s.netMapSrc.GetNetMap(0)
nodes, err := s.getContainerNodes(cid)
if err != nil {
return err
}
c, err := s.cnrSrc.Get(cid)
if err != nil {
return err
}
binCnr := make([]byte, sha256.Size)
cid.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return fmt.Errorf("can't build a list of container nodes")
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ {
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
var res replicatorResult
task := replicator.Task{
NumCopies: 1,
@ -98,6 +82,95 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
return nil
}
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {
nodes, err := s.getContainerNodes(contID)
if err != nil {
return "", err
}
if len(nodes) == 0 {
return "", fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", treeID, contID)
}
for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil {
return hex.EncodeToString(node.PublicKey()), nil
}
}
return "", err
}
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
rawCID := make([]byte, sha256.Size)
contID.Encode(rawCID)
var height uint64
for {
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
if err != nil {
return err
}
if op.Time == 0 {
return nil
}
req := &tree.ApplyRequest{
Body: &tree.ApplyRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Operation: &tree.LogMove{
ParentId: op.Parent,
Meta: op.Meta.Bytes(),
ChildId: op.Child,
},
},
}
err = tree.SignMessage(req, s.key)
fyrchik marked this conversation as resolved Outdated

Why do se sign here and not in the tree service?

Why do se sign here and not in the tree service?

Just because we form request here too. So tree service gets completed request.

Just because we form request here too. So tree service gets completed request.
if err != nil {
return fmt.Errorf("can't sign apply request: %w", err)
}
err = s.treeService.ReplicateTreeOp(ctx, node, req)
if err != nil {
return err
}
height = op.Time + 1
}
}
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return nil, err
}
c, err := s.cnrSrc.Get(contID)
if err != nil {
return nil, err
}
binCnr := make([]byte, sha256.Size)
contID.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, fmt.Errorf("can't build a list of container nodes")
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ {
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
return nodes, nil
}
type replicatorResult struct {
count int
}

View file

@ -17,11 +17,18 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
return nil, status.Error(codes.PermissionDenied, err.Error())
}
var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
prm.WithAsync(true)
if req.GetBody().GetScope() == uint32(control.StartShardEvacuationRequest_Body_NONE) {
return nil, status.Error(codes.InvalidArgument, "no evacuation scope")
}
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
}
_, err = s.s.Evacuate(ctx, prm)
if err != nil {

View file

@ -4,14 +4,17 @@ import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// TreeService represents a tree service instance.
type TreeService interface {
Synchronize(ctx context.Context, cnr cid.ID, treeID string) error
SynchronizeTree(ctx context.Context, cnr cid.ID, treeID string) error
ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *tree.ApplyRequest) error
}
func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) {
@ -31,7 +34,7 @@ func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTr
return nil, status.Error(codes.Internal, err.Error())
}
err = s.treeService.Synchronize(ctx, cnr, b.GetTreeId())
err = s.treeService.SynchronizeTree(ctx, cnr, b.GetTreeId())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

File diff suppressed because it is too large Load diff

View file

@ -336,10 +336,18 @@ message DoctorResponse {
message StartShardEvacuationRequest {
// Request body structure.
message Body {
enum Scope {
NONE = 0;
OBJECTS = 1;
TREES = 2;
}
// IDs of the shards.
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
// Evacuation scope.
uint32 scope = 3;
}
Body body = 1;
@ -386,11 +394,11 @@ message GetShardEvacuationStatusResponse {
}
// Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion.
uint64 total = 1;
uint64 total_objects = 1;
// Evacuated objects count.
uint64 evacuated = 2;
uint64 evacuated_objects = 2;
// Failed objects count.
uint64 failed = 3;
uint64 failed_objects = 3;
// Shard IDs.
repeated bytes shard_ID = 4;
@ -404,7 +412,14 @@ message GetShardEvacuationStatusResponse {
string error_message = 8;
// Skipped objects count.
uint64 skipped = 9;
uint64 skipped_objects = 9;
// Total trees to evacuate count.
uint64 total_trees = 10;
// Evacuated trees count.
uint64 evacuated_trees = 11;
// Failed trees count.
uint64 failed_trees = 12;
}
Body body = 1;

View file

@ -1511,6 +1511,7 @@ func (x *StartShardEvacuationRequest_Body) StableSize() (size int) {
}
size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.IgnoreErrors)
size += proto.UInt32Size(3, x.Scope)
return size
}
@ -1532,6 +1533,7 @@ func (x *StartShardEvacuationRequest_Body) StableMarshal(buf []byte) []byte {
var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
offset += proto.UInt32Marshal(3, buf[offset:], x.Scope)
return buf
}
@ -1813,15 +1815,18 @@ func (x *GetShardEvacuationStatusResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.UInt64Size(1, x.Total)
size += proto.UInt64Size(2, x.Evacuated)
size += proto.UInt64Size(3, x.Failed)
size += proto.UInt64Size(1, x.TotalObjects)
size += proto.UInt64Size(2, x.EvacuatedObjects)
size += proto.UInt64Size(3, x.FailedObjects)
size += proto.RepeatedBytesSize(4, x.Shard_ID)
size += proto.EnumSize(5, int32(x.Status))
size += proto.NestedStructureSize(6, x.Duration)
size += proto.NestedStructureSize(7, x.StartedAt)
size += proto.StringSize(8, x.ErrorMessage)
size += proto.UInt64Size(9, x.Skipped)
size += proto.UInt64Size(9, x.SkippedObjects)
size += proto.UInt64Size(10, x.TotalTrees)
size += proto.UInt64Size(11, x.EvacuatedTrees)
size += proto.UInt64Size(12, x.FailedTrees)
return size
}
@ -1841,15 +1846,18 @@ func (x *GetShardEvacuationStatusResponse_Body) StableMarshal(buf []byte) []byte
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.UInt64Marshal(1, buf[offset:], x.Total)
offset += proto.UInt64Marshal(2, buf[offset:], x.Evacuated)
offset += proto.UInt64Marshal(3, buf[offset:], x.Failed)
offset += proto.UInt64Marshal(1, buf[offset:], x.TotalObjects)
offset += proto.UInt64Marshal(2, buf[offset:], x.EvacuatedObjects)
offset += proto.UInt64Marshal(3, buf[offset:], x.FailedObjects)
offset += proto.RepeatedBytesMarshal(4, buf[offset:], x.Shard_ID)
offset += proto.EnumMarshal(5, buf[offset:], int32(x.Status))
offset += proto.NestedStructureMarshal(6, buf[offset:], x.Duration)
offset += proto.NestedStructureMarshal(7, buf[offset:], x.StartedAt)
offset += proto.StringMarshal(8, buf[offset:], x.ErrorMessage)
offset += proto.UInt64Marshal(9, buf[offset:], x.Skipped)
offset += proto.UInt64Marshal(9, buf[offset:], x.SkippedObjects)
offset += proto.UInt64Marshal(10, buf[offset:], x.TotalTrees)
offset += proto.UInt64Marshal(11, buf[offset:], x.EvacuatedTrees)
offset += proto.UInt64Marshal(12, buf[offset:], x.FailedTrees)
return buf
}

View file

@ -71,20 +71,28 @@ func (s *Service) replicationWorker(ctx context.Context) {
case <-s.closeCh:
return
case task := <-s.replicationTasks:
_ = s.ReplicateTreeOp(ctx, task.n, task.req)
}
}
}
func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
),
)
defer span.End()
start := time.Now()
var lastErr error
var lastAddr string
task.n.IterateNetworkEndpoints(func(addr string) bool {
n.IterateNetworkEndpoints(func(addr string) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
attribute.String("address", addr),
),
)
@ -99,7 +107,7 @@ func (s *Service) replicationWorker(ctx context.Context) {
}
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
_, lastErr = c.Apply(ctx, task.req)
_, lastErr = c.Apply(ctx, req)
cancel()
return lastErr == nil
@ -114,16 +122,14 @@ func (s *Service) replicationWorker(ctx context.Context) {
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(task.n.PublicKey())),
zap.String("key", hex.EncodeToString(n.PublicKey())),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
} else {
return lastErr
}
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
}
span.End()
}
}
return nil
}
func (s *Service) replicateLoop(ctx context.Context) {