forked from TrueCloudLab/frostfs-node
[#1349] node: Evacuate objects without setting mode to MAINTENANCE
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
b3deb893ba
commit
108e4e07be
18 changed files with 156 additions and 17 deletions
|
@ -72,6 +72,7 @@ func prettyPrintShardsJSON(cmd *cobra.Command, ii []control.ShardInfo) {
|
|||
"writecache": i.GetWritecachePath(),
|
||||
"pilorama": i.GetPiloramaPath(),
|
||||
"error_count": i.GetErrorCount(),
|
||||
"evacuation_in_progress": i.GetEvacuationInProgress(),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -105,7 +106,8 @@ func prettyPrintShards(cmd *cobra.Command, ii []control.ShardInfo) {
|
|||
sb.String()+
|
||||
pathPrinter("Write-cache", i.GetWritecachePath())+
|
||||
pathPrinter("Pilorama", i.GetPiloramaPath())+
|
||||
fmt.Sprintf("Error count: %d\n", i.GetErrorCount()),
|
||||
fmt.Sprintf("Error count: %d\n", i.GetErrorCount())+
|
||||
fmt.Sprintf("Evacuation in progress: %t\n", i.GetEvacuationInProgress()),
|
||||
base58.Encode(i.GetShard_ID()),
|
||||
shardModeToString(i.GetMode()),
|
||||
)
|
||||
|
|
|
@ -10,6 +10,12 @@ First of all, by the evacuation the data is transferred to other shards of the s
|
|||
|
||||
Only one running evacuation process is allowed on the node at a time.
|
||||
|
||||
It is not necessary to turn maintenance mode on storage node.
|
||||
|
||||
Once evacuation from shard started, it is impossible to read data from it via public API, except the case when evacuation stopped manually or node restarted.
|
||||
|
||||
Because it is necessary to prevent removing by policer objects with policy `REP 1 ...` from remote node during evacuation.
|
||||
|
||||
`frostfs-cli` utility is used to manage evacuation.
|
||||
|
||||
## Commands
|
||||
|
|
|
@ -27,6 +27,7 @@ type EngineMetrics interface {
|
|||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
SetEvacuationInProgress(shardID string, value bool)
|
||||
|
||||
WriteCache() WriteCacheMetrics
|
||||
GC() GCMetrics
|
||||
|
@ -45,6 +46,7 @@ type engineMetrics struct {
|
|||
refillObjCounter *prometheus.GaugeVec
|
||||
refillPayloadCounter *prometheus.GaugeVec
|
||||
refillPercentCounter *prometheus.GaugeVec
|
||||
evacuationInProgress *shardIDModeValue
|
||||
|
||||
gc *gcMetrics
|
||||
writeCache *writeCacheMetrics
|
||||
|
@ -72,6 +74,7 @@ func newEngineMetrics() *engineMetrics {
|
|||
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
|
||||
evacuationInProgress: newShardIDMode(engineSubsystem, "evacuation_in_progress", "Shard evacuation in progress"),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,6 +127,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
|||
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.mode.Delete(shardID)
|
||||
m.refillStatus.DeleteByShardID(shardID)
|
||||
m.evacuationInProgress.Delete(shardID)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||
|
@ -213,3 +217,7 @@ func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
|
|||
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
|
||||
m.refillStatus.SetMode(shardID, path, status)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetEvacuationInProgress(shardID string, value bool) {
|
||||
m.evacuationInProgress.SetMode(shardID, strconv.FormatBool(value))
|
||||
}
|
||||
|
|
|
@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
|
|||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
sh := shardsToEvacuate[shardID]
|
||||
sh.SetEvacuationInProgress(true)
|
||||
|
||||
var c *meta.Cursor
|
||||
for {
|
||||
|
@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
|
||||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
|
@ -765,3 +767,11 @@ func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
|
|||
|
||||
return e.evacuateLimiter.ResetEvacuationStatus()
|
||||
}
|
||||
|
||||
func (e *StorageEngine) ResetEvacuationStatusForShards() {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
for _, sh := range e.shards {
|
||||
sh.SetEvacuationInProgress(false)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) {
|
|||
// Second case ensures that all objects are indeed moved and available.
|
||||
checkHasObjects(t)
|
||||
|
||||
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
|
||||
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
|
||||
for _, obj := range objects[len(objects)-objPerShard:] {
|
||||
var prmGet shard.GetPrm
|
||||
prmGet.SetAddress(objectCore.AddressOf(obj))
|
||||
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
||||
require.Error(t, err)
|
||||
|
||||
prmGet.SkipEvacCheck(true)
|
||||
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
||||
require.NoError(t, err)
|
||||
|
||||
var prmHead shard.HeadPrm
|
||||
prmHead.SetAddress(objectCore.AddressOf(obj))
|
||||
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
|
||||
require.Error(t, err)
|
||||
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.Address = objectCore.AddressOf(obj)
|
||||
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
var rngPrm shard.RngPrm
|
||||
rngPrm.SetAddress(objectCore.AddressOf(obj))
|
||||
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
||||
res, err = e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -30,6 +30,7 @@ type MetricRegister interface {
|
|||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
SetEvacuationInProgress(shardID string, value bool)
|
||||
|
||||
WriteCache() metrics.WriteCacheMetrics
|
||||
GC() metrics.GCMetrics
|
||||
|
|
|
@ -97,6 +97,10 @@ func (m *metricsWithID) SetRefillStatus(path string, status string) {
|
|||
m.mw.SetRefillStatus(m.id, path, status)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetEvacuationInProgress(value bool) {
|
||||
m.mw.SetEvacuationInProgress(m.id, value)
|
||||
}
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
|
||||
if s.info.Mode.Disabled() {
|
||||
return ExistsRes{}, ErrShardDisabled
|
||||
} else if s.info.EvacuationInProgress {
|
||||
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
} else if s.info.Mode.NoMetabase() {
|
||||
var p common.ExistsPrm
|
||||
p.Address = prm.Address
|
||||
|
|
|
@ -29,6 +29,7 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object,
|
|||
type GetPrm struct {
|
||||
addr oid.Address
|
||||
skipMeta bool
|
||||
skipEvacCheck bool
|
||||
}
|
||||
|
||||
// GetRes groups the resulting values of Get operation.
|
||||
|
@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) {
|
|||
p.skipMeta = ignore
|
||||
}
|
||||
|
||||
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
|
||||
func (p *GetPrm) SkipEvacCheck(val bool) {
|
||||
p.skipEvacCheck = val
|
||||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
func (r GetRes) Object() *objectSDK.Object {
|
||||
return r.obj
|
||||
|
@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
return GetRes{}, ErrShardDisabled
|
||||
}
|
||||
|
||||
if s.info.EvacuationInProgress && !prm.skipEvacCheck {
|
||||
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
||||
var getPrm common.GetPrm
|
||||
getPrm.Address = prm.addr
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
|
|||
res, err = s.Get(ctx, getPrm)
|
||||
obj = res.Object()
|
||||
} else {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
if s.info.EvacuationInProgress {
|
||||
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
var headParams meta.GetPrm
|
||||
headParams.SetAddress(prm.addr)
|
||||
headParams.SetRaw(prm.raw)
|
||||
|
|
|
@ -16,6 +16,9 @@ type Info struct {
|
|||
// Shard mode.
|
||||
Mode mode.Mode
|
||||
|
||||
// True when evacuation is in progress.
|
||||
EvacuationInProgress bool
|
||||
|
||||
// Information about the metabase.
|
||||
MetaBaseInfo meta.Info
|
||||
|
||||
|
|
|
@ -192,6 +192,9 @@ func (m *metricsStore) SetRefillStatus(_ string, status string) {
|
|||
m.refillStatus = status
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetEvacuationInProgress(bool) {
|
||||
}
|
||||
|
||||
func TestCounters(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
|
|||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.EvacuationInProgress {
|
||||
return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
if s.info.Mode.Disabled() {
|
||||
return RngRes{}, ErrShardDisabled
|
||||
}
|
||||
|
|
|
@ -97,6 +97,8 @@ type MetricsWriter interface {
|
|||
SetRefillPercent(path string, percent uint32)
|
||||
// SetRefillStatus sets refill status.
|
||||
SetRefillStatus(path string, status string)
|
||||
// SetEvacuationInProgress sets evacuation status
|
||||
SetEvacuationInProgress(value bool)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
@ -579,3 +581,12 @@ func (s *Shard) DeleteShardMetrics() {
|
|||
s.cfg.metricsWriter.DeleteShardMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) SetEvacuationInProgress(val bool) {
|
||||
s.m.Lock()
|
||||
defer s.m.Unlock()
|
||||
s.info.EvacuationInProgress = val
|
||||
if s.metricsWriter != nil {
|
||||
s.metricsWriter.SetEvacuationInProgress(val)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,9 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard
|
|||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
s.s.ResetEvacuationStatusForShards()
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
|
|||
|
||||
si.SetMode(m)
|
||||
si.SetErrorCount(sh.ErrorCount)
|
||||
si.SetEvacuationInProgress(sh.EvacuationInProgress)
|
||||
|
||||
shardInfos = append(shardInfos, *si)
|
||||
}
|
||||
|
|
|
@ -142,6 +142,9 @@ message ShardInfo {
|
|||
|
||||
// Path to shard's pilorama storage.
|
||||
string pilorama_path = 7 [ json_name = "piloramaPath" ];
|
||||
|
||||
// Evacuation status.
|
||||
bool evacuation_in_progress = 8 [ json_name = "evacuationInProgress" ];
|
||||
}
|
||||
|
||||
// Blobstor component description.
|
||||
|
|
31
pkg/services/control/types_frostfs.pb.go
generated
31
pkg/services/control/types_frostfs.pb.go
generated
|
@ -961,6 +961,7 @@ type ShardInfo struct {
|
|||
Mode ShardMode `json:"mode"`
|
||||
ErrorCount uint32 `json:"errorCount"`
|
||||
PiloramaPath string `json:"piloramaPath"`
|
||||
EvacuationInProgress bool `json:"evacuationInProgress"`
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -986,6 +987,7 @@ func (x *ShardInfo) StableSize() (size int) {
|
|||
size += proto.EnumSize(5, int32(x.Mode))
|
||||
size += proto.UInt32Size(6, x.ErrorCount)
|
||||
size += proto.StringSize(7, x.PiloramaPath)
|
||||
size += proto.BoolSize(8, x.EvacuationInProgress)
|
||||
return size
|
||||
}
|
||||
|
||||
|
@ -1023,6 +1025,9 @@ func (x *ShardInfo) EmitProtobuf(mm *easyproto.MessageMarshaler) {
|
|||
if len(x.PiloramaPath) != 0 {
|
||||
mm.AppendString(7, x.PiloramaPath)
|
||||
}
|
||||
if x.EvacuationInProgress {
|
||||
mm.AppendBool(8, x.EvacuationInProgress)
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
|
||||
|
@ -1080,6 +1085,12 @@ func (x *ShardInfo) UnmarshalProtobuf(src []byte) (err error) {
|
|||
return fmt.Errorf("cannot unmarshal field %s", "PiloramaPath")
|
||||
}
|
||||
x.PiloramaPath = data
|
||||
case 8: // EvacuationInProgress
|
||||
data, ok := fc.Bool()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "EvacuationInProgress")
|
||||
}
|
||||
x.EvacuationInProgress = data
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -1147,6 +1158,15 @@ func (x *ShardInfo) GetPiloramaPath() string {
|
|||
func (x *ShardInfo) SetPiloramaPath(v string) {
|
||||
x.PiloramaPath = v
|
||||
}
|
||||
func (x *ShardInfo) GetEvacuationInProgress() bool {
|
||||
if x != nil {
|
||||
return x.EvacuationInProgress
|
||||
}
|
||||
return false
|
||||
}
|
||||
func (x *ShardInfo) SetEvacuationInProgress(v bool) {
|
||||
x.EvacuationInProgress = v
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (x *ShardInfo) MarshalJSON() ([]byte, error) {
|
||||
|
@ -1202,6 +1222,11 @@ func (x *ShardInfo) MarshalEasyJSON(out *jwriter.Writer) {
|
|||
out.RawString(prefix)
|
||||
out.String(x.PiloramaPath)
|
||||
}
|
||||
{
|
||||
const prefix string = ",\"evacuationInProgress\":"
|
||||
out.RawString(prefix)
|
||||
out.Bool(x.EvacuationInProgress)
|
||||
}
|
||||
out.RawByte('}')
|
||||
}
|
||||
|
||||
|
@ -1296,6 +1321,12 @@ func (x *ShardInfo) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
|||
f = in.String()
|
||||
x.PiloramaPath = f
|
||||
}
|
||||
case "evacuationInProgress":
|
||||
{
|
||||
var f bool
|
||||
f = in.Bool()
|
||||
x.EvacuationInProgress = f
|
||||
}
|
||||
}
|
||||
in.WantComma()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue