node: Evacuate objects without setting mode to MAINTENANCE
#1349
|
@ -65,13 +65,14 @@ func prettyPrintShardsJSON(cmd *cobra.Command, ii []control.ShardInfo) {
|
|||
out := make([]map[string]any, 0, len(ii))
|
||||
for _, i := range ii {
|
||||
out = append(out, map[string]any{
|
||||
"shard_id": base58.Encode(i.GetShard_ID()),
|
||||
"mode": shardModeToString(i.GetMode()),
|
||||
"metabase": i.GetMetabasePath(),
|
||||
"blobstor": i.GetBlobstor(),
|
||||
"writecache": i.GetWritecachePath(),
|
||||
"pilorama": i.GetPiloramaPath(),
|
||||
"error_count": i.GetErrorCount(),
|
||||
"shard_id": base58.Encode(i.GetShard_ID()),
|
||||
"mode": shardModeToString(i.GetMode()),
|
||||
"metabase": i.GetMetabasePath(),
|
||||
"blobstor": i.GetBlobstor(),
|
||||
"writecache": i.GetWritecachePath(),
|
||||
"pilorama": i.GetPiloramaPath(),
|
||||
"error_count": i.GetErrorCount(),
|
||||
"evacuation_in_progress": i.GetEvacuationInProgress(),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -105,7 +106,8 @@ func prettyPrintShards(cmd *cobra.Command, ii []control.ShardInfo) {
|
|||
sb.String()+
|
||||
pathPrinter("Write-cache", i.GetWritecachePath())+
|
||||
pathPrinter("Pilorama", i.GetPiloramaPath())+
|
||||
fmt.Sprintf("Error count: %d\n", i.GetErrorCount()),
|
||||
fmt.Sprintf("Error count: %d\n", i.GetErrorCount())+
|
||||
fmt.Sprintf("Evacuation in progress: %t\n", i.GetEvacuationInProgress()),
|
||||
base58.Encode(i.GetShard_ID()),
|
||||
shardModeToString(i.GetMode()),
|
||||
)
|
||||
|
|
|
@ -10,6 +10,12 @@ First of all, by the evacuation the data is transferred to other shards of the s
|
|||
|
||||
Only one running evacuation process is allowed on the node at a time.
|
||||
|
||||
It is not necessary to turn maintenance mode on storage node.
|
||||
|
||||
Once evacuation from shard started, it is impossible to read data from it via public API, except the case when evacuation stopped manually or node restarted.
|
||||
|
||||
Because it is necessary to prevent removing by policer objects with policy `REP 1 ...` from remote node during evacuation.
|
||||
|
||||
`frostfs-cli` utility is used to manage evacuation.
|
||||
|
||||
## Commands
|
||||
|
|
|
@ -27,6 +27,7 @@ type EngineMetrics interface {
|
|||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
SetEvacuationInProgress(shardID string, value bool)
|
||||
|
||||
WriteCache() WriteCacheMetrics
|
||||
GC() GCMetrics
|
||||
|
@ -45,6 +46,7 @@ type engineMetrics struct {
|
|||
refillObjCounter *prometheus.GaugeVec
|
||||
refillPayloadCounter *prometheus.GaugeVec
|
||||
refillPercentCounter *prometheus.GaugeVec
|
||||
evacuationInProgress *shardIDModeValue
|
||||
|
||||
gc *gcMetrics
|
||||
writeCache *writeCacheMetrics
|
||||
|
@ -72,6 +74,7 @@ func newEngineMetrics() *engineMetrics {
|
|||
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
|
||||
evacuationInProgress: newShardIDMode(engineSubsystem, "evacuation_in_progress", "Shard evacuation in progress"),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,6 +127,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
|||
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.mode.Delete(shardID)
|
||||
m.refillStatus.DeleteByShardID(shardID)
|
||||
m.evacuationInProgress.Delete(shardID)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||
|
@ -213,3 +217,7 @@ func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
|
|||
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
|
||||
m.refillStatus.SetMode(shardID, path, status)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetEvacuationInProgress(shardID string, value bool) {
|
||||
m.evacuationInProgress.SetMode(shardID, strconv.FormatBool(value))
|
||||
}
|
||||
|
|
|
@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
|
|||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
sh := shardsToEvacuate[shardID]
|
||||
sh.SetEvacuationInProgress(true)
|
||||
|
||||
|
||||
var c *meta.Cursor
|
||||
for {
|
||||
|
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
|
||||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
|
@ -765,3 +767,11 @@ func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
|
|||
|
||||
return e.evacuateLimiter.ResetEvacuationStatus()
|
||||
}
|
||||
|
||||
func (e *StorageEngine) ResetEvacuationStatusForShards() {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
for _, sh := range e.shards {
|
||||
sh.SetEvacuationInProgress(false)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
|
|||
// Second case ensures that all objects are indeed moved and available.
|
||||
checkHasObjects(t)
|
||||
|
||||
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
|
||||
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
|
||||
for _, obj := range objects[len(objects)-objPerShard:] {
|
||||
var prmGet shard.GetPrm
|
||||
prmGet.SetAddress(objectCore.AddressOf(obj))
|
||||
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
||||
require.Error(t, err)
|
||||
|
||||
prmGet.SkipEvacCheck(true)
|
||||
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
||||
require.NoError(t, err)
|
||||
|
||||
var prmHead shard.HeadPrm
|
||||
prmHead.SetAddress(objectCore.AddressOf(obj))
|
||||
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
|
||||
require.Error(t, err)
|
||||
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.Address = objectCore.AddressOf(obj)
|
||||
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
var rngPrm shard.RngPrm
|
||||
rngPrm.SetAddress(objectCore.AddressOf(obj))
|
||||
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
||||
res, err = e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -30,6 +30,7 @@ type MetricRegister interface {
|
|||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
SetEvacuationInProgress(shardID string, value bool)
|
||||
|
||||
WriteCache() metrics.WriteCacheMetrics
|
||||
GC() metrics.GCMetrics
|
||||
|
|
|
@ -97,6 +97,10 @@ func (m *metricsWithID) SetRefillStatus(path string, status string) {
|
|||
m.mw.SetRefillStatus(m.id, path, status)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetEvacuationInProgress(value bool) {
|
||||
m.mw.SetEvacuationInProgress(m.id, value)
|
||||
}
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
|
||||
if s.info.Mode.Disabled() {
|
||||
return ExistsRes{}, ErrShardDisabled
|
||||
} else if s.info.EvacuationInProgress {
|
||||
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
} else if s.info.Mode.NoMetabase() {
|
||||
var p common.ExistsPrm
|
||||
p.Address = prm.Address
|
||||
|
|
|
@ -27,8 +27,9 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
|
|||
|
||||
// GetPrm groups the parameters of Get operation.
|
||||
type GetPrm struct {
|
||||
addr oid.Address
|
||||
skipMeta bool
|
||||
addr oid.Address
|
||||
skipMeta bool
|
||||
skipEvacCheck bool
|
||||
}
|
||||
|
||||
// GetRes groups the resulting values of Get operation.
|
||||
|
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
|
|||
p.skipMeta = ignore
|
||||
}
|
||||
|
||||
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
|
||||
func (p *GetPrm) SkipEvacCheck(val bool) {
|
||||
p.skipEvacCheck = val
|
||||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
func (r GetRes) Object() *objectSDK.Object {
|
||||
return r.obj
|
||||
|
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
return GetRes{}, ErrShardDisabled
|
||||
}
|
||||
|
||||
if s.info.EvacuationInProgress && !prm.skipEvacCheck {
|
||||
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
||||
var getPrm common.GetPrm
|
||||
getPrm.Address = prm.addr
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
|
|||
res, err = s.Get(ctx, getPrm)
|
||||
obj = res.Object()
|
||||
} else {
|
||||
s.m.RLock()
|
||||
fyrchik
commented
On an unrelated note: we should take mode for the whole operation duration. On an unrelated note: we should take mode for the whole operation duration.
I am surprised it wasn't the case before.
Not for this PR
acid-ant
commented
That was done in a such way because we take the same lock here That was done in a such way because we take the same lock here `res, err = s.Get(ctx, getPrm)`. Maybe that was the reason why we don't take lock for the whole operation.
|
||||
defer s.m.RUnlock()
|
||||
if s.info.EvacuationInProgress {
|
||||
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
var headParams meta.GetPrm
|
||||
headParams.SetAddress(prm.addr)
|
||||
headParams.SetRaw(prm.raw)
|
||||
|
|
|
@ -16,6 +16,9 @@ type Info struct {
|
|||
// Shard mode.
|
||||
Mode mode.Mode
|
||||
|
||||
// True when evacuation is in progress.
|
||||
EvacuationInProgress bool
|
||||
fyrchik
commented
`EvacuationInProgress`?
acid-ant
commented
Renamed. Renamed.
|
||||
|
||||
// Information about the metabase.
|
||||
MetaBaseInfo meta.Info
|
||||
|
||||
|
|
|
@ -192,6 +192,9 @@ func (m *metricsStore) SetRefillStatus(_ string, status string) {
|
|||
m.refillStatus = status
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetEvacuationInProgress(bool) {
|
||||
}
|
||||
|
||||
func TestCounters(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
|
|||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.EvacuationInProgress {
|
||||
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
if s.info.Mode.Disabled() {
|
||||
return RngRes{}, ErrShardDisabled
|
||||
}
|
||||
|
|
|
@ -97,6 +97,8 @@ type MetricsWriter interface {
|
|||
SetRefillPercent(path string, percent uint32)
|
||||
// SetRefillStatus sets refill status.
|
||||
SetRefillStatus(path string, status string)
|
||||
// SetEvacuationInProgress sets evacuation status
|
||||
SetEvacuationInProgress(value bool)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
@ -579,3 +581,12 @@ func (s *Shard) DeleteShardMetrics() {
|
|||
s.cfg.metricsWriter.DeleteShardMetrics()
|
||||
}
|
||||
}
|
||||
fyrchik
commented
Evacuation is done on the engine level, but we have a flag on the shard level and because of this we have Evacuation is done on the engine level, but we have a flag on the shard level _and_ because of this we have `SkipEvacCheck` parameter to the operation.
Was having `evacuationInProgress` flag considered to be present somewhere in the engine structure?
acid-ant
commented
It looks more complex and slower. Once it will be on an It looks more complex and slower. Once it will be on an `engine` level, we should check for each request is evacuation for shard in progress, and go to internal map for checking shard and exclude it from iteration. If this overhead is ok for us, we can do that in a such way.
|
||||
|
||||
func (s *Shard) SetEvacuationInProgress(val bool) {
|
||||
s.m.Lock()
|
||||
defer s.m.Unlock()
|
||||
s.info.EvacuationInProgress = val
|
||||
if s.metricsWriter != nil {
|
||||
s.metricsWriter.SetEvacuationInProgress(val)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,9 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard
|
|||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
s.s.ResetEvacuationStatusForShards()
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
|
|||
|
||||
si.SetMode(m)
|
||||
si.SetErrorCount(sh.ErrorCount)
|
||||
si.SetEvacuationInProgress(sh.EvacuationInProgress)
|
||||
|
||||
shardInfos = append(shardInfos, *si)
|
||||
}
|
||||
|
|
|
@ -142,6 +142,9 @@ message ShardInfo {
|
|||
|
||||
// Path to shard's pilorama storage.
|
||||
string pilorama_path = 7 [ json_name = "piloramaPath" ];
|
||||
|
||||
// Evacuation status.
|
||||
bool evacuation_in_progress = 8 [ json_name = "evacuationInProgress" ];
|
||||
}
|
||||
|
||||
// Blobstor component description.
|
||||
|
|
45
pkg/services/control/types_frostfs.pb.go
generated
|
@ -954,13 +954,14 @@ func (x *Netmap) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
|||
}
|
||||
|
||||
type ShardInfo struct {
|
||||
Shard_ID []byte `json:"shardID"`
|
||||
MetabasePath string `json:"metabasePath"`
|
||||
Blobstor []BlobstorInfo `json:"blobstor"`
|
||||
WritecachePath string `json:"writecachePath"`
|
||||
Mode ShardMode `json:"mode"`
|
||||
ErrorCount uint32 `json:"errorCount"`
|
||||
PiloramaPath string `json:"piloramaPath"`
|
||||
Shard_ID []byte `json:"shardID"`
|
||||
MetabasePath string `json:"metabasePath"`
|
||||
Blobstor []BlobstorInfo `json:"blobstor"`
|
||||
WritecachePath string `json:"writecachePath"`
|
||||
Mode ShardMode `json:"mode"`
|
||||
ErrorCount uint32 `json:"errorCount"`
|
||||
PiloramaPath string `json:"piloramaPath"`
|
||||
EvacuationInProgress bool `json:"evacuationInProgress"`
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -986,6 +987,7 @@ func (x *ShardInfo) StableSize() (size int) {
|
|||
size += proto.EnumSize(5, int32(x.Mode))
|
||||
size += proto.UInt32Size(6, x.ErrorCount)
|
||||
size += proto.StringSize(7, x.PiloramaPath)
|
||||
size += proto.BoolSize(8, x.EvacuationInProgress)
|
||||
return size
|
||||
}
|
||||
|
||||
|
@ -1023,6 +1025,9 @@ func (x *ShardInfo) EmitProtobuf(mm *easyproto.MessageMarshaler) {
|
|||
if len(x.PiloramaPath) != 0 {
|
||||
mm.AppendString(7, x.PiloramaPath)
|
||||
}
|
||||
if x.EvacuationInProgress {
|
||||
mm.AppendBool(8, x.EvacuationInProgress)
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
|
||||
|
@ -1080,6 +1085,12 @@ func (x *ShardInfo) UnmarshalProtobuf(src []byte) (err error) {
|
|||
return fmt.Errorf("cannot unmarshal field %s", "PiloramaPath")
|
||||
}
|
||||
x.PiloramaPath = data
|
||||
case 8: // EvacuationInProgress
|
||||
data, ok := fc.Bool()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "EvacuationInProgress")
|
||||
}
|
||||
x.EvacuationInProgress = data
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -1147,6 +1158,15 @@ func (x *ShardInfo) GetPiloramaPath() string {
|
|||
func (x *ShardInfo) SetPiloramaPath(v string) {
|
||||
x.PiloramaPath = v
|
||||
}
|
||||
func (x *ShardInfo) GetEvacuationInProgress() bool {
|
||||
if x != nil {
|
||||
return x.EvacuationInProgress
|
||||
}
|
||||
return false
|
||||
}
|
||||
func (x *ShardInfo) SetEvacuationInProgress(v bool) {
|
||||
x.EvacuationInProgress = v
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (x *ShardInfo) MarshalJSON() ([]byte, error) {
|
||||
|
@ -1202,6 +1222,11 @@ func (x *ShardInfo) MarshalEasyJSON(out *jwriter.Writer) {
|
|||
out.RawString(prefix)
|
||||
out.String(x.PiloramaPath)
|
||||
}
|
||||
{
|
||||
const prefix string = ",\"evacuationInProgress\":"
|
||||
out.RawString(prefix)
|
||||
out.Bool(x.EvacuationInProgress)
|
||||
}
|
||||
out.RawByte('}')
|
||||
}
|
||||
|
||||
|
@ -1296,6 +1321,12 @@ func (x *ShardInfo) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
|||
f = in.String()
|
||||
x.PiloramaPath = f
|
||||
}
|
||||
case "evacuationInProgress":
|
||||
{
|
||||
var f bool
|
||||
f = in.Bool()
|
||||
x.EvacuationInProgress = f
|
||||
}
|
||||
}
|
||||
in.WantComma()
|
||||
}
|
||||
|
|
When is this disabled? What if we get error in
ListWithCursor
below?When we move shard to
DISABLED
mode. Think this should be the one-way trip. Otherwise, it is possible to lose the data.But for evacuation shard mode
READ_ONLY
is only required...