[#xx] node: Evacuate objects without setting mode to MAINTENANCE
Some checks failed
DCO action / DCO (pull_request) Failing after 3m5s
Tests and linters / Run gofumpt (pull_request) Successful in 3m3s
Tests and linters / Tests (1.23) (pull_request) Successful in 4m45s
Tests and linters / Tests (1.22) (pull_request) Successful in 4m47s
Tests and linters / Lint (pull_request) Successful in 5m3s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m12s
Vulncheck / Vulncheck (pull_request) Successful in 4m57s
Tests and linters / Staticcheck (pull_request) Successful in 5m39s
Tests and linters / Tests with -race (pull_request) Successful in 6m47s
Build / Build Components (1.23) (pull_request) Successful in 6m58s
Build / Build Components (1.22) (pull_request) Successful in 8m34s
Tests and linters / gopls check (pull_request) Successful in 8m50s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-03 12:18:10 +03:00
parent 2b3fc50681
commit d2c27341a9
8 changed files with 66 additions and 2 deletions

View file

@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
listPrm.WithCount(defaultEvacuateBatchSize) listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
sh.SetEvacInProgress(true)
var c *meta.Cursor var c *meta.Cursor
for { for {
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
getRes, err := sh.Get(ctx, getPrm) getRes, err := sh.Get(ctx, getPrm)
if err != nil { if err != nil {

View file

@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
// Second case ensures that all objects are indeed moved and available. // Second case ensures that all objects are indeed moved and available.
checkHasObjects(t) checkHasObjects(t)
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
for _, obj := range objects[len(objects)-objPerShard:] {
var prmGet shard.GetPrm
prmGet.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.Error(t, err)
prmGet.SkipEvacCheck(true)
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.NoError(t, err)
var prmHead shard.HeadPrm
prmHead.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
require.Error(t, err)
var existsPrm shard.ExistsPrm
existsPrm.Address = objectCore.AddressOf(obj)
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
require.Error(t, err)
var rngPrm shard.RngPrm
rngPrm.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
require.Error(t, err)
}
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done. // Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm) res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)

View file

@ -5,7 +5,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
if s.info.Mode.Disabled() { if s.info.Mode.Disabled() {
return ExistsRes{}, ErrShardDisabled return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() { } else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm var p common.ExistsPrm
p.Address = prm.Address p.Address = prm.Address

View file

@ -29,6 +29,7 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
type GetPrm struct { type GetPrm struct {
addr oid.Address addr oid.Address
skipMeta bool skipMeta bool
skipEvacCheck bool
} }
// GetRes groups the resulting values of Get operation. // GetRes groups the resulting values of Get operation.
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
p.skipMeta = ignore p.skipMeta = ignore
} }
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
func (p *GetPrm) SkipEvacCheck(val bool) {
p.skipEvacCheck = val
}
// Object returns the requested object. // Object returns the requested object.
func (r GetRes) Object() *objectSDK.Object { func (r GetRes) Object() *objectSDK.Object {
return r.obj return r.obj
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, ErrShardDisabled return GetRes{}, ErrShardDisabled
} }
if s.info.EvacInProgress && !prm.skipEvacCheck {
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm var getPrm common.GetPrm
getPrm.Address = prm.addr getPrm.Address = prm.addr

View file

@ -4,7 +4,9 @@ import (
"context" "context"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
res, err = s.Get(ctx, getPrm) res, err = s.Get(ctx, getPrm)
obj = res.Object() obj = res.Object()
} else { } else {
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacInProgress {
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
var headParams meta.GetPrm var headParams meta.GetPrm
headParams.SetAddress(prm.addr) headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw) headParams.SetRaw(prm.raw)

View file

@ -16,6 +16,9 @@ type Info struct {
// Shard mode. // Shard mode.
Mode mode.Mode Mode mode.Mode
// True when evacuation is in progress.
EvacInProgress bool
// Information about the metabase. // Information about the metabase.
MetaBaseInfo meta.Info MetaBaseInfo meta.Info

View file

@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
if s.info.EvacInProgress {
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if s.info.Mode.Disabled() { if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled return RngRes{}, ErrShardDisabled
} }

View file

@ -579,3 +579,9 @@ func (s *Shard) DeleteShardMetrics() {
s.cfg.metricsWriter.DeleteShardMetrics() s.cfg.metricsWriter.DeleteShardMetrics()
} }
} }
func (s *Shard) SetEvacInProgress(val bool) {
s.m.Lock()
defer s.m.Unlock()
s.info.EvacInProgress = val
}