diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 04e427e49..01ade875e 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -366,6 +366,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string listPrm.WithCount(defaultEvacuateBatchSize) sh := shardsToEvacuate[shardID] + sh.SetEvacInProgress(true) var c *meta.Cursor for { @@ -655,6 +656,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to var getPrm shard.GetPrm getPrm.SetAddress(addr) + getPrm.SkipEvacCheck(true) getRes, err := sh.Get(ctx, getPrm) if err != nil { diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 55268b549..f8d0ffd6b 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -125,6 +125,34 @@ func TestEvacuateShardObjects(t *testing.T) { // Second case ensures that all objects are indeed moved and available. checkHasObjects(t) + // Objects on evacuated shards should be logically unavailable, but persisted on disk. + // This is necessary to prevent removing it by policer in case of `REP 1` policy. + for _, obj := range objects[len(objects)-objPerShard:] { + var prmGet shard.GetPrm + prmGet.SetAddress(objectCore.AddressOf(obj)) + _, err = e.shards[evacuateShardID].Get(context.Background(), prmGet) + require.Error(t, err) + + prmGet.SkipEvacCheck(true) + _, err = e.shards[evacuateShardID].Get(context.Background(), prmGet) + require.NoError(t, err) + + var prmHead shard.HeadPrm + prmHead.SetAddress(objectCore.AddressOf(obj)) + _, err = e.shards[evacuateShardID].Head(context.Background(), prmHead) + require.Error(t, err) + + var existsPrm shard.ExistsPrm + existsPrm.Address = objectCore.AddressOf(obj) + _, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm) + require.Error(t, err) + + var rngPrm shard.RngPrm + rngPrm.SetAddress(objectCore.AddressOf(obj)) + _, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm) + require.Error(t, err) + } + // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. res, err = e.Evacuate(context.Background(), prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index b5a9604b4..048682140 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -5,7 +5,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -60,6 +62,8 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { if s.info.Mode.Disabled() { return ExistsRes{}, ErrShardDisabled + } else if s.info.EvacInProgress { + return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) } else if s.info.Mode.NoMetabase() { var p common.ExistsPrm p.Address = prm.Address diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 2e7c84bcd..8c7d6d35e 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -27,8 +27,9 @@ type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, // GetPrm groups the parameters of Get operation. type GetPrm struct { - addr oid.Address - skipMeta bool + addr oid.Address + skipMeta bool + skipEvacCheck bool } // GetRes groups the resulting values of Get operation. @@ -50,6 +51,11 @@ func (p *GetPrm) SetIgnoreMeta(ignore bool) { p.skipMeta = ignore } +// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress. +func (p *GetPrm) SkipEvacCheck(val bool) { + p.skipEvacCheck = val +} + // Object returns the requested object. func (r GetRes) Object() *objectSDK.Object { return r.obj @@ -85,6 +91,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { return GetRes{}, ErrShardDisabled } + if s.info.EvacInProgress && !prm.skipEvacCheck { + return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { var getPrm common.GetPrm getPrm.Address = prm.addr diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index 9d5d31260..c16caf433 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -4,7 +4,9 @@ import ( "context" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" @@ -70,6 +72,11 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) { res, err = s.Get(ctx, getPrm) obj = res.Object() } else { + s.m.RLock() + defer s.m.RUnlock() + if s.info.EvacInProgress { + return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } var headParams meta.GetPrm headParams.SetAddress(prm.addr) headParams.SetRaw(prm.raw) diff --git a/pkg/local_object_storage/shard/info.go b/pkg/local_object_storage/shard/info.go index 1051ab3db..f5776c5d8 100644 --- a/pkg/local_object_storage/shard/info.go +++ b/pkg/local_object_storage/shard/info.go @@ -16,6 +16,9 @@ type Info struct { // Shard mode. Mode mode.Mode + // True when evacuation is in progress. + EvacInProgress bool + // Information about the metabase. MetaBaseInfo meta.Info diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index 9491543c4..e0909eefe 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -87,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { s.m.RLock() defer s.m.RUnlock() + if s.info.EvacInProgress { + return RngRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + if s.info.Mode.Disabled() { return RngRes{}, ErrShardDisabled } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index d11bcc36b..a642fdb8f 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -579,3 +579,9 @@ func (s *Shard) DeleteShardMetrics() { s.cfg.metricsWriter.DeleteShardMetrics() } } + +func (s *Shard) SetEvacInProgress(val bool) { + s.m.Lock() + defer s.m.Unlock() + s.info.EvacInProgress = val +}