[#1349] node: Evacuate objects without setting mode to MAINTENANCE
All checks were successful
DCO action / DCO (pull_request) Successful in 8m30s
Build / Build Components (1.23) (pull_request) Successful in 9m23s
Build / Build Components (1.22) (pull_request) Successful in 9m28s
Pre-commit hooks / Pre-commit (pull_request) Successful in 9m30s
Tests and linters / Run gofumpt (pull_request) Successful in 2m31s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m46s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m46s
Tests and linters / Staticcheck (pull_request) Successful in 3m56s
Tests and linters / Lint (pull_request) Successful in 4m55s
Tests and linters / gopls check (pull_request) Successful in 4m46s
Tests and linters / Tests with -race (pull_request) Successful in 5m0s
Vulncheck / Vulncheck (pull_request) Successful in 49s
All checks were successful
DCO action / DCO (pull_request) Successful in 8m30s
Build / Build Components (1.23) (pull_request) Successful in 9m23s
Build / Build Components (1.22) (pull_request) Successful in 9m28s
Pre-commit hooks / Pre-commit (pull_request) Successful in 9m30s
Tests and linters / Run gofumpt (pull_request) Successful in 2m31s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m46s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m46s
Tests and linters / Staticcheck (pull_request) Successful in 3m56s
Tests and linters / Lint (pull_request) Successful in 4m55s
Tests and linters / gopls check (pull_request) Successful in 4m46s
Tests and linters / Tests with -race (pull_request) Successful in 5m0s
Vulncheck / Vulncheck (pull_request) Successful in 49s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
2b3fc50681
commit
280c981cb2
8 changed files with 66 additions and 2 deletions
|
@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
|
||||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||||
|
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
|
sh.SetEvacInProgress(true)
|
||||||
|
|
||||||
var c *meta.Cursor
|
var c *meta.Cursor
|
||||||
for {
|
for {
|
||||||
|
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm shard.GetPrm
|
||||||
getPrm.SetAddress(addr)
|
getPrm.SetAddress(addr)
|
||||||
|
getPrm.SkipEvacCheck(true)
|
||||||
|
|
||||||
getRes, err := sh.Get(ctx, getPrm)
|
getRes, err := sh.Get(ctx, getPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
|
||||||
// Second case ensures that all objects are indeed moved and available.
|
// Second case ensures that all objects are indeed moved and available.
|
||||||
checkHasObjects(t)
|
checkHasObjects(t)
|
||||||
|
|
||||||
|
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
|
||||||
|
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
|
||||||
|
for _, obj := range objects[len(objects)-objPerShard:] {
|
||||||
|
var prmGet shard.GetPrm
|
||||||
|
prmGet.SetAddress(objectCore.AddressOf(obj))
|
||||||
|
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
prmGet.SkipEvacCheck(true)
|
||||||
|
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var prmHead shard.HeadPrm
|
||||||
|
prmHead.SetAddress(objectCore.AddressOf(obj))
|
||||||
|
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
var existsPrm shard.ExistsPrm
|
||||||
|
existsPrm.Address = objectCore.AddressOf(obj)
|
||||||
|
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
var rngPrm shard.RngPrm
|
||||||
|
rngPrm.SetAddress(objectCore.AddressOf(obj))
|
||||||
|
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
||||||
res, err = e.Evacuate(context.Background(), prm)
|
res, err = e.Evacuate(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -5,7 +5,9 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
|
|
||||||
if s.info.Mode.Disabled() {
|
if s.info.Mode.Disabled() {
|
||||||
return ExistsRes{}, ErrShardDisabled
|
return ExistsRes{}, ErrShardDisabled
|
||||||
|
} else if s.info.EvacInProgress {
|
||||||
|
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
} else if s.info.Mode.NoMetabase() {
|
} else if s.info.Mode.NoMetabase() {
|
||||||
var p common.ExistsPrm
|
var p common.ExistsPrm
|
||||||
p.Address = prm.Address
|
p.Address = prm.Address
|
||||||
|
|
|
@ -29,6 +29,7 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
|
||||||
type GetPrm struct {
|
type GetPrm struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
skipMeta bool
|
skipMeta bool
|
||||||
|
skipEvacCheck bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRes groups the resulting values of Get operation.
|
// GetRes groups the resulting values of Get operation.
|
||||||
|
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
|
||||||
p.skipMeta = ignore
|
p.skipMeta = ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
|
||||||
|
func (p *GetPrm) SkipEvacCheck(val bool) {
|
||||||
|
p.skipEvacCheck = val
|
||||||
|
}
|
||||||
|
|
||||||
// Object returns the requested object.
|
// Object returns the requested object.
|
||||||
func (r GetRes) Object() *objectSDK.Object {
|
func (r GetRes) Object() *objectSDK.Object {
|
||||||
return r.obj
|
return r.obj
|
||||||
|
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
return GetRes{}, ErrShardDisabled
|
return GetRes{}, ErrShardDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.info.EvacInProgress && !prm.skipEvacCheck {
|
||||||
|
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
}
|
||||||
|
|
||||||
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
||||||
var getPrm common.GetPrm
|
var getPrm common.GetPrm
|
||||||
getPrm.Address = prm.addr
|
getPrm.Address = prm.addr
|
||||||
|
|
|
@ -4,7 +4,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
|
||||||
res, err = s.Get(ctx, getPrm)
|
res, err = s.Get(ctx, getPrm)
|
||||||
obj = res.Object()
|
obj = res.Object()
|
||||||
} else {
|
} else {
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
if s.info.EvacInProgress {
|
||||||
|
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
}
|
||||||
var headParams meta.GetPrm
|
var headParams meta.GetPrm
|
||||||
headParams.SetAddress(prm.addr)
|
headParams.SetAddress(prm.addr)
|
||||||
headParams.SetRaw(prm.raw)
|
headParams.SetRaw(prm.raw)
|
||||||
|
|
|
@ -16,6 +16,9 @@ type Info struct {
|
||||||
// Shard mode.
|
// Shard mode.
|
||||||
Mode mode.Mode
|
Mode mode.Mode
|
||||||
|
|
||||||
|
// True when evacuation is in progress.
|
||||||
|
EvacInProgress bool
|
||||||
|
|
||||||
// Information about the metabase.
|
// Information about the metabase.
|
||||||
MetaBaseInfo meta.Info
|
MetaBaseInfo meta.Info
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.EvacInProgress {
|
||||||
|
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
}
|
||||||
|
|
||||||
if s.info.Mode.Disabled() {
|
if s.info.Mode.Disabled() {
|
||||||
return RngRes{}, ErrShardDisabled
|
return RngRes{}, ErrShardDisabled
|
||||||
}
|
}
|
||||||
|
|
|
@ -579,3 +579,9 @@ func (s *Shard) DeleteShardMetrics() {
|
||||||
s.cfg.metricsWriter.DeleteShardMetrics()
|
s.cfg.metricsWriter.DeleteShardMetrics()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) SetEvacInProgress(val bool) {
|
||||||
|
s.m.Lock()
|
||||||
|
defer s.m.Unlock()
|
||||||
|
s.info.EvacInProgress = val
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue