[#1349] node: Evacuate objects without setting mode to MAINTENANCE
Some checks failed
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m8s
Tests and linters / Run gofumpt (pull_request) Successful in 2m30s
DCO action / DCO (pull_request) Successful in 2m32s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m52s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m4s
Tests and linters / Tests with -race (pull_request) Failing after 3m3s
Vulncheck / Vulncheck (pull_request) Successful in 2m57s
Tests and linters / Lint (pull_request) Successful in 3m48s
Build / Build Components (1.23) (pull_request) Successful in 3m33s
Build / Build Components (1.22) (pull_request) Successful in 3m37s
Tests and linters / Staticcheck (pull_request) Successful in 3m56s
Tests and linters / gopls check (pull_request) Successful in 4m54s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-03 12:18:10 +03:00
parent 2b3fc50681
commit 659c5989ce
17 changed files with 112 additions and 10 deletions

View file

@ -72,6 +72,7 @@ func prettyPrintShardsJSON(cmd *cobra.Command, ii []control.ShardInfo) {
"writecache": i.GetWritecachePath(), "writecache": i.GetWritecachePath(),
"pilorama": i.GetPiloramaPath(), "pilorama": i.GetPiloramaPath(),
"error_count": i.GetErrorCount(), "error_count": i.GetErrorCount(),
"evacuation_in_progress": i.GetEvacuationInProgress(),
}) })
} }
@ -105,7 +106,8 @@ func prettyPrintShards(cmd *cobra.Command, ii []control.ShardInfo) {
sb.String()+ sb.String()+
pathPrinter("Write-cache", i.GetWritecachePath())+ pathPrinter("Write-cache", i.GetWritecachePath())+
pathPrinter("Pilorama", i.GetPiloramaPath())+ pathPrinter("Pilorama", i.GetPiloramaPath())+
fmt.Sprintf("Error count: %d\n", i.GetErrorCount()), fmt.Sprintf("Error count: %d\n", i.GetErrorCount())+
fmt.Sprintf("Evacuation in progress: %t\n", i.GetEvacuationInProgress()),
base58.Encode(i.GetShard_ID()), base58.Encode(i.GetShard_ID()),
shardModeToString(i.GetMode()), shardModeToString(i.GetMode()),
) )

View file

@ -27,6 +27,7 @@ type EngineMetrics interface {
IncRefillObjectsCount(shardID, path string, size int, success bool) IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32) SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string) SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() WriteCacheMetrics WriteCache() WriteCacheMetrics
GC() GCMetrics GC() GCMetrics
@ -45,6 +46,7 @@ type engineMetrics struct {
refillObjCounter *prometheus.GaugeVec refillObjCounter *prometheus.GaugeVec
refillPayloadCounter *prometheus.GaugeVec refillPayloadCounter *prometheus.GaugeVec
refillPercentCounter *prometheus.GaugeVec refillPercentCounter *prometheus.GaugeVec
evacuationInProgress *shardIDModeValue
gc *gcMetrics gc *gcMetrics
writeCache *writeCacheMetrics writeCache *writeCacheMetrics
@ -72,6 +74,7 @@ func newEngineMetrics() *engineMetrics {
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}), refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
evacuationInProgress: newShardIDMode(engineSubsystem, "evacuation_in_progress", "Shard evacuation in progress"),
} }
} }
@ -124,6 +127,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID) m.mode.Delete(shardID)
m.refillStatus.DeleteByShardID(shardID) m.refillStatus.DeleteByShardID(shardID)
m.evacuationInProgress.Delete(shardID)
} }
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) { func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
@ -213,3 +217,7 @@ func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) { func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
m.refillStatus.SetMode(shardID, path, status) m.refillStatus.SetMode(shardID, path, status)
} }
func (m *engineMetrics) SetEvacuationInProgress(shardID string, value bool) {
m.evacuationInProgress.SetMode(shardID, strconv.FormatBool(value))
}

View file

@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
listPrm.WithCount(defaultEvacuateBatchSize) listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
var c *meta.Cursor var c *meta.Cursor
for { for {
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
getRes, err := sh.Get(ctx, getPrm) getRes, err := sh.Get(ctx, getPrm)
if err != nil { if err != nil {
@ -765,3 +767,11 @@ func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
return e.evacuateLimiter.ResetEvacuationStatus() return e.evacuateLimiter.ResetEvacuationStatus()
} }
func (e *StorageEngine) ResetEvacuationStatusForShards() {
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
sh.SetEvacuationInProgress(false)
}
}

View file

@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
// Second case ensures that all objects are indeed moved and available. // Second case ensures that all objects are indeed moved and available.
checkHasObjects(t) checkHasObjects(t)
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
for _, obj := range objects[len(objects)-objPerShard:] {
var prmGet shard.GetPrm
prmGet.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.Error(t, err)
prmGet.SkipEvacCheck(true)
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.NoError(t, err)
var prmHead shard.HeadPrm
prmHead.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
require.Error(t, err)
var existsPrm shard.ExistsPrm
existsPrm.Address = objectCore.AddressOf(obj)
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
require.Error(t, err)
var rngPrm shard.RngPrm
rngPrm.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
require.Error(t, err)
}
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done. // Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm) res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)

View file

@ -30,6 +30,7 @@ type MetricRegister interface {
IncRefillObjectsCount(shardID, path string, size int, success bool) IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32) SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string) SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() metrics.WriteCacheMetrics WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics GC() metrics.GCMetrics

View file

@ -97,6 +97,10 @@ func (m *metricsWithID) SetRefillStatus(path string, status string) {
m.mw.SetRefillStatus(m.id, path, status) m.mw.SetRefillStatus(m.id, path, status)
} }
func (m *metricsWithID) SetEvacuationInProgress(value bool) {
m.mw.SetEvacuationInProgress(m.id, value)
}
// AddShard adds a new shard to the storage engine. // AddShard adds a new shard to the storage engine.
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.

View file

@ -5,7 +5,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
if s.info.Mode.Disabled() { if s.info.Mode.Disabled() {
return ExistsRes{}, ErrShardDisabled return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() { } else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm var p common.ExistsPrm
p.Address = prm.Address p.Address = prm.Address

View file

@ -29,6 +29,7 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
type GetPrm struct { type GetPrm struct {
addr oid.Address addr oid.Address
skipMeta bool skipMeta bool
skipEvacCheck bool
} }
// GetRes groups the resulting values of Get operation. // GetRes groups the resulting values of Get operation.
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
p.skipMeta = ignore p.skipMeta = ignore
} }
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
func (p *GetPrm) SkipEvacCheck(val bool) {
p.skipEvacCheck = val
}
// Object returns the requested object. // Object returns the requested object.
func (r GetRes) Object() *objectSDK.Object { func (r GetRes) Object() *objectSDK.Object {
return r.obj return r.obj
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, ErrShardDisabled return GetRes{}, ErrShardDisabled
} }
if s.info.EvacuationInProgress && !prm.skipEvacCheck {
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm var getPrm common.GetPrm
getPrm.Address = prm.addr getPrm.Address = prm.addr

View file

@ -4,7 +4,9 @@ import (
"context" "context"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
res, err = s.Get(ctx, getPrm) res, err = s.Get(ctx, getPrm)
obj = res.Object() obj = res.Object()
} else { } else {
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
var headParams meta.GetPrm var headParams meta.GetPrm
headParams.SetAddress(prm.addr) headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw) headParams.SetRaw(prm.raw)

View file

@ -16,6 +16,9 @@ type Info struct {
// Shard mode. // Shard mode.
Mode mode.Mode Mode mode.Mode
// True when evacuation is in progress.
EvacuationInProgress bool
// Information about the metabase. // Information about the metabase.
MetaBaseInfo meta.Info MetaBaseInfo meta.Info

View file

@ -192,6 +192,9 @@ func (m *metricsStore) SetRefillStatus(_ string, status string) {
m.refillStatus = status m.refillStatus = status
} }
func (m *metricsStore) SetEvacuationInProgress(bool) {
}
func TestCounters(t *testing.T) { func TestCounters(t *testing.T) {
t.Parallel() t.Parallel()

View file

@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if s.info.Mode.Disabled() { if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled return RngRes{}, ErrShardDisabled
} }

View file

@ -97,6 +97,8 @@ type MetricsWriter interface {
SetRefillPercent(path string, percent uint32) SetRefillPercent(path string, percent uint32)
// SetRefillStatus sets refill status. // SetRefillStatus sets refill status.
SetRefillStatus(path string, status string) SetRefillStatus(path string, status string)
// SetEvacuationInProgress sets evacuation status
SetEvacuationInProgress(value bool)
} }
type cfg struct { type cfg struct {
@ -579,3 +581,12 @@ func (s *Shard) DeleteShardMetrics() {
s.cfg.metricsWriter.DeleteShardMetrics() s.cfg.metricsWriter.DeleteShardMetrics()
} }
} }
func (s *Shard) SetEvacuationInProgress(val bool) {
s.m.Lock()
defer s.m.Unlock()
s.info.EvacuationInProgress = val
if s.metricsWriter != nil {
s.metricsWriter.SetEvacuationInProgress(val)
}
}

View file

@ -101,6 +101,9 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
s.s.ResetEvacuationStatusForShards()
return resp, nil return resp, nil
} }

View file

@ -53,6 +53,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
si.SetMode(m) si.SetMode(m)
si.SetErrorCount(sh.ErrorCount) si.SetErrorCount(sh.ErrorCount)
si.SetEvacuationInProgress(sh.EvacuationInProgress)
shardInfos = append(shardInfos, *si) shardInfos = append(shardInfos, *si)
} }

View file

@ -142,6 +142,9 @@ message ShardInfo {
// Path to shard's pilorama storage. // Path to shard's pilorama storage.
string pilorama_path = 7 [ json_name = "piloramaPath" ]; string pilorama_path = 7 [ json_name = "piloramaPath" ];
// Evacuation status.
bool evacuation_in_progress = 8 [ json_name = "evacuationInProgress" ];
} }
// Blobstor component description. // Blobstor component description.

Binary file not shown.