[#1349] node: Evacuate objects without setting mode to MAINTENANCE
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m46s
DCO action / DCO (pull_request) Successful in 2m43s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m59s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m6s
Vulncheck / Vulncheck (pull_request) Successful in 2m52s
Build / Build Components (1.22) (pull_request) Successful in 3m24s
Build / Build Components (1.23) (pull_request) Successful in 3m24s
Tests and linters / Lint (pull_request) Successful in 3m44s
Tests and linters / Staticcheck (pull_request) Successful in 3m48s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m54s
Tests and linters / Tests with -race (pull_request) Successful in 4m33s
Tests and linters / gopls check (pull_request) Successful in 4m31s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-03 12:18:10 +03:00
parent a4fb7f085b
commit 5717d86173
17 changed files with 112 additions and 10 deletions

View file

@ -65,13 +65,14 @@ func prettyPrintShardsJSON(cmd *cobra.Command, ii []control.ShardInfo) {
out := make([]map[string]any, 0, len(ii))
for _, i := range ii {
out = append(out, map[string]any{
"shard_id": base58.Encode(i.GetShard_ID()),
"mode": shardModeToString(i.GetMode()),
"metabase": i.GetMetabasePath(),
"blobstor": i.GetBlobstor(),
"writecache": i.GetWritecachePath(),
"pilorama": i.GetPiloramaPath(),
"error_count": i.GetErrorCount(),
"shard_id": base58.Encode(i.GetShard_ID()),
"mode": shardModeToString(i.GetMode()),
"metabase": i.GetMetabasePath(),
"blobstor": i.GetBlobstor(),
"writecache": i.GetWritecachePath(),
"pilorama": i.GetPiloramaPath(),
"error_count": i.GetErrorCount(),
"evacuation_in_progress": i.GetEvacuationInProgress(),
})
}
@ -105,7 +106,8 @@ func prettyPrintShards(cmd *cobra.Command, ii []control.ShardInfo) {
sb.String()+
pathPrinter("Write-cache", i.GetWritecachePath())+
pathPrinter("Pilorama", i.GetPiloramaPath())+
fmt.Sprintf("Error count: %d\n", i.GetErrorCount()),
fmt.Sprintf("Error count: %d\n", i.GetErrorCount())+
fmt.Sprintf("Evacuation in progress: %t\n", i.GetEvacuationInProgress()),
base58.Encode(i.GetShard_ID()),
shardModeToString(i.GetMode()),
)

View file

@ -27,6 +27,7 @@ type EngineMetrics interface {
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() WriteCacheMetrics
GC() GCMetrics
@ -45,6 +46,7 @@ type engineMetrics struct {
refillObjCounter *prometheus.GaugeVec
refillPayloadCounter *prometheus.GaugeVec
refillPercentCounter *prometheus.GaugeVec
evacuationInProgress *shardIDModeValue
gc *gcMetrics
writeCache *writeCacheMetrics
@ -72,6 +74,7 @@ func newEngineMetrics() *engineMetrics {
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
evacuationInProgress: newShardIDMode(engineSubsystem, "evacuation_in_progress", "Shard evacuation in progress"),
}
}
@ -124,6 +127,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID)
m.refillStatus.DeleteByShardID(shardID)
m.evacuationInProgress.Delete(shardID)
}
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
@ -213,3 +217,7 @@ func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
m.refillStatus.SetMode(shardID, path, status)
}
func (m *engineMetrics) SetEvacuationInProgress(shardID string, value bool) {
m.evacuationInProgress.SetMode(shardID, strconv.FormatBool(value))
}

View file

@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
var c *meta.Cursor
for {
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
@ -765,3 +767,11 @@ func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
return e.evacuateLimiter.ResetEvacuationStatus()
}
func (e *StorageEngine) ResetEvacuationStatusForShards() {
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
sh.SetEvacuationInProgress(false)
}
}

View file

@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
// Second case ensures that all objects are indeed moved and available.
checkHasObjects(t)
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
for _, obj := range objects[len(objects)-objPerShard:] {
var prmGet shard.GetPrm
prmGet.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.Error(t, err)
prmGet.SkipEvacCheck(true)
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.NoError(t, err)
var prmHead shard.HeadPrm
prmHead.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
require.Error(t, err)
var existsPrm shard.ExistsPrm
existsPrm.Address = objectCore.AddressOf(obj)
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
require.Error(t, err)
var rngPrm shard.RngPrm
rngPrm.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
require.Error(t, err)
}
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)

View file

@ -30,6 +30,7 @@ type MetricRegister interface {
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics

View file

@ -97,6 +97,10 @@ func (m *metricsWithID) SetRefillStatus(path string, status string) {
m.mw.SetRefillStatus(m.id, path, status)
}
func (m *metricsWithID) SetEvacuationInProgress(value bool) {
m.mw.SetEvacuationInProgress(m.id, value)
}
// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.

View file

@ -5,7 +5,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
if s.info.Mode.Disabled() {
return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.Address

View file

@ -27,8 +27,9 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
// GetPrm groups the parameters of Get operation.
type GetPrm struct {
addr oid.Address
skipMeta bool
addr oid.Address
skipMeta bool
skipEvacCheck bool
}
// GetRes groups the resulting values of Get operation.
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
p.skipMeta = ignore
}
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
func (p *GetPrm) SkipEvacCheck(val bool) {
p.skipEvacCheck = val
}
// Object returns the requested object.
func (r GetRes) Object() *objectSDK.Object {
return r.obj
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, ErrShardDisabled
}
if s.info.EvacuationInProgress && !prm.skipEvacCheck {
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm
getPrm.Address = prm.addr

View file

@ -4,7 +4,9 @@ import (
"context"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
res, err = s.Get(ctx, getPrm)
obj = res.Object()
} else {
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
var headParams meta.GetPrm
headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw)

View file

@ -16,6 +16,9 @@ type Info struct {
// Shard mode.
Mode mode.Mode
// True when evacuation is in progress.
EvacuationInProgress bool
// Information about the metabase.
MetaBaseInfo meta.Info

View file

@ -192,6 +192,9 @@ func (m *metricsStore) SetRefillStatus(_ string, status string) {
m.refillStatus = status
}
func (m *metricsStore) SetEvacuationInProgress(bool) {
}
func TestCounters(t *testing.T) {
t.Parallel()

View file

@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled
}

View file

@ -97,6 +97,8 @@ type MetricsWriter interface {
SetRefillPercent(path string, percent uint32)
// SetRefillStatus sets refill status.
SetRefillStatus(path string, status string)
// SetEvacuationInProgress sets evacuation status
SetEvacuationInProgress(value bool)
}
type cfg struct {
@ -579,3 +581,12 @@ func (s *Shard) DeleteShardMetrics() {
s.cfg.metricsWriter.DeleteShardMetrics()
}
}
func (s *Shard) SetEvacuationInProgress(val bool) {
s.m.Lock()
defer s.m.Unlock()
s.info.EvacuationInProgress = val
if s.metricsWriter != nil {
s.metricsWriter.SetEvacuationInProgress(val)
}
}

View file

@ -101,6 +101,9 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
s.s.ResetEvacuationStatusForShards()
return resp, nil
}

View file

@ -53,6 +53,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
si.SetMode(m)
si.SetErrorCount(sh.ErrorCount)
si.SetEvacuationInProgress(sh.EvacuationInProgress)
shardInfos = append(shardInfos, *si)
}

View file

@ -142,6 +142,9 @@ message ShardInfo {
// Path to shard's pilorama storage.
string pilorama_path = 7 [ json_name = "piloramaPath" ];
// Evacuation status.
bool evacuation_in_progress = 8 [ json_name = "evacuationInProgress" ];
}
// Blobstor component description.

Binary file not shown.