node: Evacuate objects without setting mode to MAINTENANCE #1349

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:feature/evac-without-mm into master 2024-09-05 13:12:56 +00:00
18 changed files with 156 additions and 17 deletions

View file

@ -65,13 +65,14 @@ func prettyPrintShardsJSON(cmd *cobra.Command, ii []control.ShardInfo) {
out := make([]map[string]any, 0, len(ii))
for _, i := range ii {
out = append(out, map[string]any{
"shard_id": base58.Encode(i.GetShard_ID()),
"mode": shardModeToString(i.GetMode()),
"metabase": i.GetMetabasePath(),
"blobstor": i.GetBlobstor(),
"writecache": i.GetWritecachePath(),
"pilorama": i.GetPiloramaPath(),
"error_count": i.GetErrorCount(),
"shard_id": base58.Encode(i.GetShard_ID()),
"mode": shardModeToString(i.GetMode()),
"metabase": i.GetMetabasePath(),
"blobstor": i.GetBlobstor(),
"writecache": i.GetWritecachePath(),
"pilorama": i.GetPiloramaPath(),
"error_count": i.GetErrorCount(),
"evacuation_in_progress": i.GetEvacuationInProgress(),
})
}
@ -105,7 +106,8 @@ func prettyPrintShards(cmd *cobra.Command, ii []control.ShardInfo) {
sb.String()+
pathPrinter("Write-cache", i.GetWritecachePath())+
pathPrinter("Pilorama", i.GetPiloramaPath())+
fmt.Sprintf("Error count: %d\n", i.GetErrorCount()),
fmt.Sprintf("Error count: %d\n", i.GetErrorCount())+
fmt.Sprintf("Evacuation in progress: %t\n", i.GetEvacuationInProgress()),
base58.Encode(i.GetShard_ID()),
shardModeToString(i.GetMode()),
)

View file

@ -10,6 +10,12 @@ First of all, by the evacuation the data is transferred to other shards of the s
Only one running evacuation process is allowed on the node at a time.
It is not necessary to turn maintenance mode on storage node.
Once evacuation from shard started, it is impossible to read data from it via public API, except the case when evacuation stopped manually or node restarted.
Because it is necessary to prevent removing by policer objects with policy `REP 1 ...` from remote node during evacuation.
`frostfs-cli` utility is used to manage evacuation.
## Commands

View file

@ -27,6 +27,7 @@ type EngineMetrics interface {
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() WriteCacheMetrics
GC() GCMetrics
@ -45,6 +46,7 @@ type engineMetrics struct {
refillObjCounter *prometheus.GaugeVec
refillPayloadCounter *prometheus.GaugeVec
refillPercentCounter *prometheus.GaugeVec
evacuationInProgress *shardIDModeValue
gc *gcMetrics
writeCache *writeCacheMetrics
@ -72,6 +74,7 @@ func newEngineMetrics() *engineMetrics {
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
evacuationInProgress: newShardIDMode(engineSubsystem, "evacuation_in_progress", "Shard evacuation in progress"),
}
}
@ -124,6 +127,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID)
m.refillStatus.DeleteByShardID(shardID)
m.evacuationInProgress.Delete(shardID)
}
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
@ -213,3 +217,7 @@ func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
m.refillStatus.SetMode(shardID, path, status)
}
func (m *engineMetrics) SetEvacuationInProgress(shardID string, value bool) {
m.evacuationInProgress.SetMode(shardID, strconv.FormatBool(value))
}

View file

@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)

When is this disabled? What if we get error in ListWithCursor below?

When is this disabled? What if we get error in `ListWithCursor` below?

When we move shard to DISABLED mode. Think this should be the one-way trip. Otherwise, it is possible to lose the data.

When we move shard to `DISABLED` mode. Think this should be the one-way trip. Otherwise, it is possible to lose the data.

But for evacuation shard mode READ_ONLY is only required...

But for evacuation shard mode `READ_ONLY` is only required...
var c *meta.Cursor
for {
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
@ -765,3 +767,11 @@ func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
return e.evacuateLimiter.ResetEvacuationStatus()
}
func (e *StorageEngine) ResetEvacuationStatusForShards() {
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
sh.SetEvacuationInProgress(false)
}
}

View file

@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
// Second case ensures that all objects are indeed moved and available.
checkHasObjects(t)
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
for _, obj := range objects[len(objects)-objPerShard:] {
var prmGet shard.GetPrm
prmGet.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.Error(t, err)
prmGet.SkipEvacCheck(true)
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
require.NoError(t, err)
var prmHead shard.HeadPrm
prmHead.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
require.Error(t, err)
var existsPrm shard.ExistsPrm
existsPrm.Address = objectCore.AddressOf(obj)
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
require.Error(t, err)
var rngPrm shard.RngPrm
rngPrm.SetAddress(objectCore.AddressOf(obj))
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
require.Error(t, err)
}
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)

View file

@ -30,6 +30,7 @@ type MetricRegister interface {
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics

View file

@ -97,6 +97,10 @@ func (m *metricsWithID) SetRefillStatus(path string, status string) {
m.mw.SetRefillStatus(m.id, path, status)
}
func (m *metricsWithID) SetEvacuationInProgress(value bool) {
m.mw.SetEvacuationInProgress(m.id, value)
}
// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.

View file

@ -5,7 +5,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
if s.info.Mode.Disabled() {
return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.Address

View file

@ -27,8 +27,9 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
// GetPrm groups the parameters of Get operation.
type GetPrm struct {
addr oid.Address
skipMeta bool
addr oid.Address
skipMeta bool
skipEvacCheck bool
}
// GetRes groups the resulting values of Get operation.
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
p.skipMeta = ignore
}
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
func (p *GetPrm) SkipEvacCheck(val bool) {
p.skipEvacCheck = val
}
// Object returns the requested object.
func (r GetRes) Object() *objectSDK.Object {
return r.obj
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, ErrShardDisabled
}
if s.info.EvacuationInProgress && !prm.skipEvacCheck {
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm
getPrm.Address = prm.addr

View file

@ -4,7 +4,9 @@ import (
"context"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
res, err = s.Get(ctx, getPrm)
obj = res.Object()
} else {
s.m.RLock()

On an unrelated note: we should take mode for the whole operation duration.
I am surprised it wasn't the case before.
Not for this PR

On an unrelated note: we should take mode for the whole operation duration. I am surprised it wasn't the case before. Not for this PR

That was done in a such way because we take the same lock here res, err = s.Get(ctx, getPrm). Maybe that was the reason why we don't take lock for the whole operation.

That was done in a such way because we take the same lock here `res, err = s.Get(ctx, getPrm)`. Maybe that was the reason why we don't take lock for the whole operation.
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
var headParams meta.GetPrm
headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw)

View file

@ -16,6 +16,9 @@ type Info struct {
// Shard mode.
Mode mode.Mode
// True when evacuation is in progress.
EvacuationInProgress bool

EvacuationInProgress?

`EvacuationInProgress`?

Renamed.

Renamed.
// Information about the metabase.
MetaBaseInfo meta.Info

View file

@ -192,6 +192,9 @@ func (m *metricsStore) SetRefillStatus(_ string, status string) {
m.refillStatus = status
}
func (m *metricsStore) SetEvacuationInProgress(bool) {
}
func TestCounters(t *testing.T) {
t.Parallel()

View file

@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled
}

View file

@ -97,6 +97,8 @@ type MetricsWriter interface {
SetRefillPercent(path string, percent uint32)
// SetRefillStatus sets refill status.
SetRefillStatus(path string, status string)
// SetEvacuationInProgress sets evacuation status
SetEvacuationInProgress(value bool)
}
type cfg struct {
@ -579,3 +581,12 @@ func (s *Shard) DeleteShardMetrics() {
s.cfg.metricsWriter.DeleteShardMetrics()
}
}

Evacuation is done on the engine level, but we have a flag on the shard level and because of this we have SkipEvacCheck parameter to the operation.
Was having evacuationInProgress flag considered to be present somewhere in the engine structure?

Evacuation is done on the engine level, but we have a flag on the shard level _and_ because of this we have `SkipEvacCheck` parameter to the operation. Was having `evacuationInProgress` flag considered to be present somewhere in the engine structure?

It looks more complex and slower. Once it will be on an engine level, we should check for each request is evacuation for shard in progress, and go to internal map for checking shard and exclude it from iteration. If this overhead is ok for us, we can do that in a such way.

It looks more complex and slower. Once it will be on an `engine` level, we should check for each request is evacuation for shard in progress, and go to internal map for checking shard and exclude it from iteration. If this overhead is ok for us, we can do that in a such way.
func (s *Shard) SetEvacuationInProgress(val bool) {
s.m.Lock()
defer s.m.Unlock()
s.info.EvacuationInProgress = val
if s.metricsWriter != nil {
s.metricsWriter.SetEvacuationInProgress(val)
}
}

View file

@ -101,6 +101,9 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
s.s.ResetEvacuationStatusForShards()
return resp, nil
}

View file

@ -53,6 +53,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
si.SetMode(m)
si.SetErrorCount(sh.ErrorCount)
si.SetEvacuationInProgress(sh.EvacuationInProgress)
shardInfos = append(shardInfos, *si)
}

View file

@ -142,6 +142,9 @@ message ShardInfo {
// Path to shard's pilorama storage.
string pilorama_path = 7 [ json_name = "piloramaPath" ];
// Evacuation status.
bool evacuation_in_progress = 8 [ json_name = "evacuationInProgress" ];
}
// Blobstor component description.

View file

@ -954,13 +954,14 @@ func (x *Netmap) UnmarshalEasyJSON(in *jlexer.Lexer) {
}
type ShardInfo struct {
Shard_ID []byte `json:"shardID"`
MetabasePath string `json:"metabasePath"`
Blobstor []BlobstorInfo `json:"blobstor"`
WritecachePath string `json:"writecachePath"`
Mode ShardMode `json:"mode"`
ErrorCount uint32 `json:"errorCount"`
PiloramaPath string `json:"piloramaPath"`
Shard_ID []byte `json:"shardID"`
MetabasePath string `json:"metabasePath"`
Blobstor []BlobstorInfo `json:"blobstor"`
WritecachePath string `json:"writecachePath"`
Mode ShardMode `json:"mode"`
ErrorCount uint32 `json:"errorCount"`
PiloramaPath string `json:"piloramaPath"`
EvacuationInProgress bool `json:"evacuationInProgress"`
}
var (
@ -986,6 +987,7 @@ func (x *ShardInfo) StableSize() (size int) {
size += proto.EnumSize(5, int32(x.Mode))
size += proto.UInt32Size(6, x.ErrorCount)
size += proto.StringSize(7, x.PiloramaPath)
size += proto.BoolSize(8, x.EvacuationInProgress)
return size
}
@ -1023,6 +1025,9 @@ func (x *ShardInfo) EmitProtobuf(mm *easyproto.MessageMarshaler) {
if len(x.PiloramaPath) != 0 {
mm.AppendString(7, x.PiloramaPath)
}
if x.EvacuationInProgress {
mm.AppendBool(8, x.EvacuationInProgress)
}
}
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
@ -1080,6 +1085,12 @@ func (x *ShardInfo) UnmarshalProtobuf(src []byte) (err error) {
return fmt.Errorf("cannot unmarshal field %s", "PiloramaPath")
}
x.PiloramaPath = data
case 8: // EvacuationInProgress
data, ok := fc.Bool()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "EvacuationInProgress")
}
x.EvacuationInProgress = data
}
}
return nil
@ -1147,6 +1158,15 @@ func (x *ShardInfo) GetPiloramaPath() string {
func (x *ShardInfo) SetPiloramaPath(v string) {
x.PiloramaPath = v
}
func (x *ShardInfo) GetEvacuationInProgress() bool {
if x != nil {
return x.EvacuationInProgress
}
return false
}
func (x *ShardInfo) SetEvacuationInProgress(v bool) {
x.EvacuationInProgress = v
}
// MarshalJSON implements the json.Marshaler interface.
func (x *ShardInfo) MarshalJSON() ([]byte, error) {
@ -1202,6 +1222,11 @@ func (x *ShardInfo) MarshalEasyJSON(out *jwriter.Writer) {
out.RawString(prefix)
out.String(x.PiloramaPath)
}
{
const prefix string = ",\"evacuationInProgress\":"
out.RawString(prefix)
out.Bool(x.EvacuationInProgress)
}
out.RawByte('}')
}
@ -1296,6 +1321,12 @@ func (x *ShardInfo) UnmarshalEasyJSON(in *jlexer.Lexer) {
f = in.String()
x.PiloramaPath = f
}
case "evacuationInProgress":
{
var f bool
f = in.Bool()
x.EvacuationInProgress = f
}
}
in.WantComma()
}