diff --git a/cmd/frostfs-cli/modules/control/shards_list.go b/cmd/frostfs-cli/modules/control/shards_list.go index e9e49bb29..a81034a9e 100644 --- a/cmd/frostfs-cli/modules/control/shards_list.go +++ b/cmd/frostfs-cli/modules/control/shards_list.go @@ -65,13 +65,14 @@ func prettyPrintShardsJSON(cmd *cobra.Command, ii []control.ShardInfo) { out := make([]map[string]any, 0, len(ii)) for _, i := range ii { out = append(out, map[string]any{ - "shard_id": base58.Encode(i.GetShard_ID()), - "mode": shardModeToString(i.GetMode()), - "metabase": i.GetMetabasePath(), - "blobstor": i.GetBlobstor(), - "writecache": i.GetWritecachePath(), - "pilorama": i.GetPiloramaPath(), - "error_count": i.GetErrorCount(), + "shard_id": base58.Encode(i.GetShard_ID()), + "mode": shardModeToString(i.GetMode()), + "metabase": i.GetMetabasePath(), + "blobstor": i.GetBlobstor(), + "writecache": i.GetWritecachePath(), + "pilorama": i.GetPiloramaPath(), + "error_count": i.GetErrorCount(), + "evacuation_in_progress": i.GetEvacuationInProgress(), }) } @@ -105,7 +106,8 @@ func prettyPrintShards(cmd *cobra.Command, ii []control.ShardInfo) { sb.String()+ pathPrinter("Write-cache", i.GetWritecachePath())+ pathPrinter("Pilorama", i.GetPiloramaPath())+ - fmt.Sprintf("Error count: %d\n", i.GetErrorCount()), + fmt.Sprintf("Error count: %d\n", i.GetErrorCount())+ + fmt.Sprintf("Evacuation in progress: %t\n", i.GetEvacuationInProgress()), base58.Encode(i.GetShard_ID()), shardModeToString(i.GetMode()), ) diff --git a/docs/evacuation.md b/docs/evacuation.md index 9db514a9e..885ce169a 100644 --- a/docs/evacuation.md +++ b/docs/evacuation.md @@ -10,6 +10,12 @@ First of all, by the evacuation the data is transferred to other shards of the s Only one running evacuation process is allowed on the node at a time. +It is not necessary to turn maintenance mode on storage node. + +Once evacuation from shard started, it is impossible to read data from it via public API, except the case when evacuation stopped manually or node restarted. + +Because it is necessary to prevent removing by policer objects with policy `REP 1 ...` from remote node during evacuation. + `frostfs-cli` utility is used to manage evacuation. ## Commands diff --git a/internal/metrics/engine.go b/internal/metrics/engine.go index e37777e40..1d01c95ed 100644 --- a/internal/metrics/engine.go +++ b/internal/metrics/engine.go @@ -27,6 +27,7 @@ type EngineMetrics interface { IncRefillObjectsCount(shardID, path string, size int, success bool) SetRefillPercent(shardID, path string, percent uint32) SetRefillStatus(shardID, path, status string) + SetEvacuationInProgress(shardID string, value bool) WriteCache() WriteCacheMetrics GC() GCMetrics @@ -45,6 +46,7 @@ type engineMetrics struct { refillObjCounter *prometheus.GaugeVec refillPayloadCounter *prometheus.GaugeVec refillPercentCounter *prometheus.GaugeVec + evacuationInProgress *shardIDModeValue gc *gcMetrics writeCache *writeCacheMetrics @@ -72,6 +74,7 @@ func newEngineMetrics() *engineMetrics { refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}), + evacuationInProgress: newShardIDMode(engineSubsystem, "evacuation_in_progress", "Shard evacuation in progress"), } } @@ -124,6 +127,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) { m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.mode.Delete(shardID) m.refillStatus.DeleteByShardID(shardID) + m.evacuationInProgress.Delete(shardID) } func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) { @@ -213,3 +217,7 @@ func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) { func (m *engineMetrics) SetRefillStatus(shardID, path, status string) { m.refillStatus.SetMode(shardID, path, status) } + +func (m *engineMetrics) SetEvacuationInProgress(shardID string, value bool) { + m.evacuationInProgress.SetMode(shardID, strconv.FormatBool(value)) +} diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 04e427e49..7bef6edfb 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string listPrm.WithCount(defaultEvacuateBatchSize) sh := shardsToEvacuate[shardID] + sh.SetEvacuationInProgress(true) var c *meta.Cursor for { @@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to var getPrm shard.GetPrm getPrm.SetAddress(addr) + getPrm.SkipEvacCheck(true) getRes, err := sh.Get(ctx, getPrm) if err != nil { @@ -765,3 +767,11 @@ func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error { return e.evacuateLimiter.ResetEvacuationStatus() } + +func (e *StorageEngine) ResetEvacuationStatusForShards() { + e.mtx.RLock() + defer e.mtx.RUnlock() + for _, sh := range e.shards { + sh.SetEvacuationInProgress(false) + } +} diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 8d25dad4a..28529fab9 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) { // Second case ensures that all objects are indeed moved and available. checkHasObjects(t) + // Objects on evacuated shards should be logically unavailable, but persisted on disk. + // This is necessary to prevent removing it by policer in case of `REP 1` policy. + for _, obj := range objects[len(objects)-objPerShard:] { + var prmGet shard.GetPrm + prmGet.SetAddress(objectCore.AddressOf(obj)) + _, err = e.shards[evacuateShardID].Get(context.Background(), prmGet) + require.Error(t, err) + + prmGet.SkipEvacCheck(true) + _, err = e.shards[evacuateShardID].Get(context.Background(), prmGet) + require.NoError(t, err) + + var prmHead shard.HeadPrm + prmHead.SetAddress(objectCore.AddressOf(obj)) + _, err = e.shards[evacuateShardID].Head(context.Background(), prmHead) + require.Error(t, err) + + var existsPrm shard.ExistsPrm + existsPrm.Address = objectCore.AddressOf(obj) + _, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm) + require.Error(t, err) + + var rngPrm shard.RngPrm + rngPrm.SetAddress(objectCore.AddressOf(obj)) + _, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm) + require.Error(t, err) + } + // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. res, err = e.Evacuate(context.Background(), prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 72b5ae252..1c088c754 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -30,6 +30,7 @@ type MetricRegister interface { IncRefillObjectsCount(shardID, path string, size int, success bool) SetRefillPercent(shardID, path string, percent uint32) SetRefillStatus(shardID, path, status string) + SetEvacuationInProgress(shardID string, value bool) WriteCache() metrics.WriteCacheMetrics GC() metrics.GCMetrics diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 980b38a63..40584149e 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -97,6 +97,10 @@ func (m *metricsWithID) SetRefillStatus(path string, status string) { m.mw.SetRefillStatus(m.id, path, status) } +func (m *metricsWithID) SetEvacuationInProgress(value bool) { + m.mw.SetEvacuationInProgress(m.id, value) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index b5a9604b4..784bf293a 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -5,7 +5,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { if s.info.Mode.Disabled() { return ExistsRes{}, ErrShardDisabled + } else if s.info.EvacuationInProgress { + return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) } else if s.info.Mode.NoMetabase() { var p common.ExistsPrm p.Address = prm.Address diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 2e7c84bcd..d1c393613 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -27,8 +27,9 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, // GetPrm groups the parameters of Get operation. type GetPrm struct { - addr oid.Address - skipMeta bool + addr oid.Address + skipMeta bool + skipEvacCheck bool } // GetRes groups the resulting values of Get operation. @@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) { p.skipMeta = ignore } +// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress. +func (p *GetPrm) SkipEvacCheck(val bool) { + p.skipEvacCheck = val +} + // Object returns the requested object. func (r GetRes) Object() *objectSDK.Object { return r.obj @@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { return GetRes{}, ErrShardDisabled } + if s.info.EvacuationInProgress && !prm.skipEvacCheck { + return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { var getPrm common.GetPrm getPrm.Address = prm.addr diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index 9d5d31260..ff57e3bf9 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -4,7 +4,9 @@ import ( "context" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" @@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) { res, err = s.Get(ctx, getPrm) obj = res.Object() } else { + s.m.RLock() + defer s.m.RUnlock() + if s.info.EvacuationInProgress { + return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } var headParams meta.GetPrm headParams.SetAddress(prm.addr) headParams.SetRaw(prm.raw) diff --git a/pkg/local_object_storage/shard/info.go b/pkg/local_object_storage/shard/info.go index 1051ab3db..f01796ec7 100644 --- a/pkg/local_object_storage/shard/info.go +++ b/pkg/local_object_storage/shard/info.go @@ -16,6 +16,9 @@ type Info struct { // Shard mode. Mode mode.Mode + // True when evacuation is in progress. + EvacuationInProgress bool + // Information about the metabase. MetaBaseInfo meta.Info diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 1ef849c02..01a85da97 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -192,6 +192,9 @@ func (m *metricsStore) SetRefillStatus(_ string, status string) { m.refillStatus = status } +func (m *metricsStore) SetEvacuationInProgress(bool) { +} + func TestCounters(t *testing.T) { t.Parallel() diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index 9491543c4..701268820 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { s.m.RLock() defer s.m.RUnlock() + if s.info.EvacuationInProgress { + return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if s.info.Mode.Disabled() { return RngRes{}, ErrShardDisabled } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index d11bcc36b..ac389b506 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -97,6 +97,8 @@ type MetricsWriter interface { SetRefillPercent(path string, percent uint32) // SetRefillStatus sets refill status. SetRefillStatus(path string, status string) + // SetEvacuationInProgress sets evacuation status + SetEvacuationInProgress(value bool) } type cfg struct { @@ -579,3 +581,12 @@ func (s *Shard) DeleteShardMetrics() { s.cfg.metricsWriter.DeleteShardMetrics() } } + +func (s *Shard) SetEvacuationInProgress(val bool) { + s.m.Lock() + defer s.m.Unlock() + s.info.EvacuationInProgress = val + if s.metricsWriter != nil { + s.metricsWriter.SetEvacuationInProgress(val) + } +} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index b829573ec..aacebe9e3 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -101,6 +101,9 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard if err != nil { return nil, status.Error(codes.Internal, err.Error()) } + + s.s.ResetEvacuationStatusForShards() + return resp, nil } diff --git a/pkg/services/control/server/list_shards.go b/pkg/services/control/server/list_shards.go index 56bd9fc1f..efe2754ea 100644 --- a/pkg/services/control/server/list_shards.go +++ b/pkg/services/control/server/list_shards.go @@ -53,6 +53,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) ( si.SetMode(m) si.SetErrorCount(sh.ErrorCount) + si.SetEvacuationInProgress(sh.EvacuationInProgress) shardInfos = append(shardInfos, *si) } diff --git a/pkg/services/control/types.proto b/pkg/services/control/types.proto index 55636d88a..d8135ed64 100644 --- a/pkg/services/control/types.proto +++ b/pkg/services/control/types.proto @@ -142,6 +142,9 @@ message ShardInfo { // Path to shard's pilorama storage. string pilorama_path = 7 [ json_name = "piloramaPath" ]; + + // Evacuation status. + bool evacuation_in_progress = 8 [ json_name = "evacuationInProgress" ]; } // Blobstor component description. diff --git a/pkg/services/control/types_frostfs.pb.go b/pkg/services/control/types_frostfs.pb.go index 42c1afa52..f92106589 100644 --- a/pkg/services/control/types_frostfs.pb.go +++ b/pkg/services/control/types_frostfs.pb.go @@ -954,13 +954,14 @@ func (x *Netmap) UnmarshalEasyJSON(in *jlexer.Lexer) { } type ShardInfo struct { - Shard_ID []byte `json:"shardID"` - MetabasePath string `json:"metabasePath"` - Blobstor []BlobstorInfo `json:"blobstor"` - WritecachePath string `json:"writecachePath"` - Mode ShardMode `json:"mode"` - ErrorCount uint32 `json:"errorCount"` - PiloramaPath string `json:"piloramaPath"` + Shard_ID []byte `json:"shardID"` + MetabasePath string `json:"metabasePath"` + Blobstor []BlobstorInfo `json:"blobstor"` + WritecachePath string `json:"writecachePath"` + Mode ShardMode `json:"mode"` + ErrorCount uint32 `json:"errorCount"` + PiloramaPath string `json:"piloramaPath"` + EvacuationInProgress bool `json:"evacuationInProgress"` } var ( @@ -986,6 +987,7 @@ func (x *ShardInfo) StableSize() (size int) { size += proto.EnumSize(5, int32(x.Mode)) size += proto.UInt32Size(6, x.ErrorCount) size += proto.StringSize(7, x.PiloramaPath) + size += proto.BoolSize(8, x.EvacuationInProgress) return size } @@ -1023,6 +1025,9 @@ func (x *ShardInfo) EmitProtobuf(mm *easyproto.MessageMarshaler) { if len(x.PiloramaPath) != 0 { mm.AppendString(7, x.PiloramaPath) } + if x.EvacuationInProgress { + mm.AppendBool(8, x.EvacuationInProgress) + } } // UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. @@ -1080,6 +1085,12 @@ func (x *ShardInfo) UnmarshalProtobuf(src []byte) (err error) { return fmt.Errorf("cannot unmarshal field %s", "PiloramaPath") } x.PiloramaPath = data + case 8: // EvacuationInProgress + data, ok := fc.Bool() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "EvacuationInProgress") + } + x.EvacuationInProgress = data } } return nil @@ -1147,6 +1158,15 @@ func (x *ShardInfo) GetPiloramaPath() string { func (x *ShardInfo) SetPiloramaPath(v string) { x.PiloramaPath = v } +func (x *ShardInfo) GetEvacuationInProgress() bool { + if x != nil { + return x.EvacuationInProgress + } + return false +} +func (x *ShardInfo) SetEvacuationInProgress(v bool) { + x.EvacuationInProgress = v +} // MarshalJSON implements the json.Marshaler interface. func (x *ShardInfo) MarshalJSON() ([]byte, error) { @@ -1202,6 +1222,11 @@ func (x *ShardInfo) MarshalEasyJSON(out *jwriter.Writer) { out.RawString(prefix) out.String(x.PiloramaPath) } + { + const prefix string = ",\"evacuationInProgress\":" + out.RawString(prefix) + out.Bool(x.EvacuationInProgress) + } out.RawByte('}') } @@ -1296,6 +1321,12 @@ func (x *ShardInfo) UnmarshalEasyJSON(in *jlexer.Lexer) { f = in.String() x.PiloramaPath = f } + case "evacuationInProgress": + { + var f bool + f = in.Bool() + x.EvacuationInProgress = f + } } in.WantComma() }