From 2c4da8222d6e9dddadf77a36b0b42add10afc199 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 25 Feb 2025 15:46:24 +0300 Subject: [PATCH] [#9999] objectstore: Add stub Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/container.go | 2 +- pkg/local_object_storage/engine/control.go | 15 +- .../engine/control_test.go | 4 +- pkg/local_object_storage/engine/error_test.go | 4 +- pkg/local_object_storage/engine/inhume.go | 6 +- pkg/local_object_storage/engine/list.go | 9 +- .../engine/remove_copies.go | 3 +- pkg/local_object_storage/engine/shards.go | 2 +- .../objectstore/children.go | 11 + .../objectstore/containers.go | 39 ++ .../objectstore/control.go | 19 + .../objectstore/counters.go | 21 ++ .../objectstore/delete.go | 51 +++ .../objectstore/exists.go | 22 ++ pkg/local_object_storage/objectstore/flush.go | 17 + .../objectstore/garbage.go | 119 +++++++ pkg/local_object_storage/objectstore/get.go | 12 + .../objectstore/get_range.go | 12 + pkg/local_object_storage/objectstore/head.go | 17 + pkg/local_object_storage/objectstore/id.go | 15 + pkg/local_object_storage/objectstore/info.go | 7 + .../objectstore/inhume.go | 152 ++++++++ .../objectstore/iterate.go | 37 ++ pkg/local_object_storage/objectstore/list.go | 26 ++ pkg/local_object_storage/objectstore/lock.go | 24 ++ pkg/local_object_storage/objectstore/mode.go | 11 + pkg/local_object_storage/objectstore/put.go | 11 + .../objectstore/rebuild.go | 7 + .../objectstore/select.go | 23 ++ pkg/local_object_storage/objectstore/store.go | 27 +- pkg/local_object_storage/shard/container.go | 10 +- pkg/local_object_storage/shard/control.go | 332 ++---------------- .../shard/control_test.go | 302 ---------------- pkg/local_object_storage/shard/count.go | 2 +- pkg/local_object_storage/shard/delete.go | 73 +--- pkg/local_object_storage/shard/exists.go | 34 +- pkg/local_object_storage/shard/gc.go | 50 +-- .../shard/gc_internal_test.go | 141 -------- pkg/local_object_storage/shard/gc_test.go | 295 ---------------- pkg/local_object_storage/shard/get.go | 101 +----- pkg/local_object_storage/shard/head.go | 32 +- pkg/local_object_storage/shard/id.go | 17 +- pkg/local_object_storage/shard/info.go | 15 +- pkg/local_object_storage/shard/inhume.go | 11 +- pkg/local_object_storage/shard/list.go | 49 +-- pkg/local_object_storage/shard/lock.go | 21 +- .../shard/metrics_test.go | 8 +- pkg/local_object_storage/shard/mode.go | 35 +- pkg/local_object_storage/shard/put.go | 56 +-- pkg/local_object_storage/shard/range.go | 48 +-- pkg/local_object_storage/shard/rebuild.go | 10 +- pkg/local_object_storage/shard/refill_test.go | 76 ---- pkg/local_object_storage/shard/reload_test.go | 2 +- pkg/local_object_storage/shard/select.go | 15 +- pkg/local_object_storage/shard/shard.go | 73 +--- pkg/local_object_storage/shard/writecache.go | 10 +- pkg/services/control/server/list_shards.go | 3 - 57 files changed, 860 insertions(+), 1686 deletions(-) create mode 100644 pkg/local_object_storage/objectstore/children.go create mode 100644 pkg/local_object_storage/objectstore/containers.go create mode 100644 pkg/local_object_storage/objectstore/control.go create mode 100644 pkg/local_object_storage/objectstore/counters.go create mode 100644 pkg/local_object_storage/objectstore/delete.go create mode 100644 pkg/local_object_storage/objectstore/exists.go create mode 100644 pkg/local_object_storage/objectstore/flush.go create mode 100644 pkg/local_object_storage/objectstore/garbage.go create mode 100644 pkg/local_object_storage/objectstore/get.go create mode 100644 pkg/local_object_storage/objectstore/get_range.go create mode 100644 pkg/local_object_storage/objectstore/head.go create mode 100644 pkg/local_object_storage/objectstore/id.go create mode 100644 pkg/local_object_storage/objectstore/info.go create mode 100644 pkg/local_object_storage/objectstore/inhume.go create mode 100644 pkg/local_object_storage/objectstore/iterate.go create mode 100644 pkg/local_object_storage/objectstore/list.go create mode 100644 pkg/local_object_storage/objectstore/lock.go create mode 100644 pkg/local_object_storage/objectstore/mode.go create mode 100644 pkg/local_object_storage/objectstore/put.go create mode 100644 pkg/local_object_storage/objectstore/rebuild.go create mode 100644 pkg/local_object_storage/objectstore/select.go delete mode 100644 pkg/local_object_storage/shard/gc_internal_test.go delete mode 100644 pkg/local_object_storage/shard/gc_test.go delete mode 100644 pkg/local_object_storage/shard/refill_test.go diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index b2d7a1037..3160d7f83 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) var csPrm shard.ContainerSizePrm csPrm.SetContainerID(prm.cnr) - csRes, err := sh.Shard.ContainerSize(csPrm) + csRes, err := sh.Shard.ContainerSize(ctx, csPrm) if err != nil { e.reportShardError(ctx, sh, "can't get container size", err, zap.Stringer("container_id", prm.cnr)) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 6a416cfd9..6624f66e1 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -77,7 +77,7 @@ func (e *StorageEngine) Init(ctx context.Context) error { errCh := make(chan shardInitError, len(e.shards)) var eg errgroup.Group - if e.cfg.lowMem && e.anyShardRequiresRefill() { + if e.cfg.lowMem { eg.SetLimit(1) } @@ -131,15 +131,6 @@ func (e *StorageEngine) Init(ctx context.Context) error { return nil } -func (e *StorageEngine) anyShardRequiresRefill() bool { - for _, sh := range e.shards { - if sh.NeedRefillMetabase() { - return true - } - } - return false -} - var errClosed = errors.New("storage engine is closed") // Close releases all StorageEngine's components. Waits for all data-related operations to complete. @@ -350,8 +341,8 @@ func calculateShardID(info shard.Info) string { // This calculation should be kept in sync with node // configuration parsing during SIGHUP. var sb strings.Builder - for _, sub := range info.BlobStorInfo.SubStorages { - sb.WriteString(filepath.Clean(sub.Path)) + for _, path := range []string{info.ObjectStoreInfo.BlobPath, info.ObjectStoreInfo.MetaPath} { + sb.WriteString(filepath.Clean(path)) } return sb.String() } diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index c9efc312c..3e0193e51 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -213,8 +213,8 @@ func TestPersistentShardID(t *testing.T) { } require.NoError(t, newTe.ng.Close(context.Background())) - p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path - p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path + p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath + p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath tmp := filepath.Join(dir, "tmp") require.NoError(t, os.Rename(p1, tmp)) require.NoError(t, os.Rename(p2, p1)) diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index d68a7e826..7ab8cfac5 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -195,8 +195,8 @@ func TestBlobstorFailback(t *testing.T) { checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) require.NoError(t, te.ng.Close(context.Background())) - p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path - p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path + p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath + p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath tmp := filepath.Join(dir, "tmp") require.NoError(t, os.Rename(p1, tmp)) require.NoError(t, os.Rename(p2, p1)) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index fb802ef2a..117cb6cc7 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -5,7 +5,7 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" @@ -270,7 +270,7 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I return allLocks, outErr } -func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { +func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []objectstore.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(ctx, addrs) @@ -339,7 +339,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid var drop []cid.ID for id := range idMap { prm.SetContainerID(id) - s, err := sh.ContainerSize(prm) + s, err := sh.ContainerSize(ctx, prm) if err != nil { e.log.Warn(ctx, logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err)) failed = true diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 073248862..a14fd18db 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -6,6 +6,7 @@ import ( "sort" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" ) @@ -20,14 +21,14 @@ var ErrEndOfListing = shard.ErrEndOfListing type Cursor struct { current string shardIDs map[string]bool - shardIDToCursor map[string]*shard.Cursor + shardIDToCursor map[string]*objectstore.Cursor } -func (c *Cursor) getCurrentShardCursor() *shard.Cursor { +func (c *Cursor) getCurrentShardCursor() *objectstore.Cursor { return c.shardIDToCursor[c.current] } -func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) { +func (c *Cursor) setCurrentShardCursor(sc *objectstore.Cursor) { c.shardIDToCursor[c.current] = sc } @@ -177,7 +178,7 @@ func getSortedShardIDs(e *StorageEngine) []string { func newCursor(shardIDs []string) *Cursor { shardIDsMap := make(map[string]bool) - shardIDToCursor := make(map[string]*shard.Cursor) + shardIDToCursor := make(map[string]*objectstore.Cursor) for _, shardID := range shardIDs { shardIDsMap[shardID] = false } diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go index 8ab3c5217..8580cdcb4 100644 --- a/pkg/local_object_storage/engine/remove_copies.go +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" @@ -64,7 +65,7 @@ func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicat errG.Go(func() error { defer close(ch) - var cursor *meta.Cursor + var cursor *objectstore.Cursor for { var listPrm shard.ListWithCursorPrm listPrm.WithCount(uint32(prm.Concurrency)) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 8e191f72c..9dd9d57cc 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -140,7 +140,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* )...) if err := sh.UpdateID(ctx); err != nil { - e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err)) + e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.Error(err)) } return sh, nil diff --git a/pkg/local_object_storage/objectstore/children.go b/pkg/local_object_storage/objectstore/children.go new file mode 100644 index 000000000..764c660eb --- /dev/null +++ b/pkg/local_object_storage/objectstore/children.go @@ -0,0 +1,11 @@ +package objectstore + +import ( + "context" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (s *ObjectStore) GetChildren(ctx context.Context, addresses []oid.Address) (map[oid.Address][]oid.Address, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/containers.go b/pkg/local_object_storage/objectstore/containers.go new file mode 100644 index 000000000..eaf786403 --- /dev/null +++ b/pkg/local_object_storage/objectstore/containers.go @@ -0,0 +1,39 @@ +package objectstore + +import ( + "context" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" +) + +func (s *ObjectStore) Containers(ctx context.Context) (list []cid.ID, err error) { + panic("unimplemented") +} + +func (s *ObjectStore) ContainerSize(ctx context.Context, id cid.ID) (uint64, error) { + panic("unimplemented") +} + +type ContainerCounters struct { + Counts map[cid.ID]ObjectCounters +} + +func (s *ObjectStore) ContainerCounters(ctx context.Context) (ContainerCounters, error) { + panic("unimplemented") +} + +func (s *ObjectStore) DeleteContainerSize(ctx context.Context, id cid.ID) error { + panic("unimplemented") +} + +func (s *ObjectStore) DeleteContainerCount(ctx context.Context, id cid.ID) error { + panic("unimplemented") +} + +func (s *ObjectStore) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) { + panic("unimplemented") +} + +func (s *ObjectStore) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/control.go b/pkg/local_object_storage/objectstore/control.go new file mode 100644 index 000000000..13eab0fce --- /dev/null +++ b/pkg/local_object_storage/objectstore/control.go @@ -0,0 +1,19 @@ +package objectstore + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" +) + +func (s *ObjectStore) Open(ctx context.Context, m mode.Mode) error { + panic("unimplmented") +} + +func (s *ObjectStore) Init(ctx context.Context) error { + panic("unimplmented") +} + +func (s *ObjectStore) Close(ctx context.Context) error { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/counters.go b/pkg/local_object_storage/objectstore/counters.go new file mode 100644 index 000000000..84022a82b --- /dev/null +++ b/pkg/local_object_storage/objectstore/counters.go @@ -0,0 +1,21 @@ +package objectstore + +import ( + "context" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" +) + +type ObjectCounters struct { + Logic uint64 + Phy uint64 + User uint64 +} + +func (s *ObjectStore) ObjectCounters(ctx context.Context) (ObjectCounters, error) { + panic("unimplemented") +} + +func (s *ObjectStore) ContainerCount(ctx context.Context, containerID cid.ID) (ObjectCounters, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/delete.go b/pkg/local_object_storage/objectstore/delete.go new file mode 100644 index 000000000..1a92f70d0 --- /dev/null +++ b/pkg/local_object_storage/objectstore/delete.go @@ -0,0 +1,51 @@ +package objectstore + +import ( + "context" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type DeleteRes struct { + phyCount uint64 + logicCount uint64 + userCount uint64 + phySize uint64 + logicSize uint64 + removedByCnrID map[cid.ID]ObjectCounters +} + +// LogicCount returns the number of removed logic +// objects. +func (d DeleteRes) LogicCount() uint64 { + return d.logicCount +} + +func (d DeleteRes) UserCount() uint64 { + return d.userCount +} + +// RemovedByCnrID returns the number of removed objects by container ID. +func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { + return d.removedByCnrID +} + +// PhyCount returns the number of removed physical objects. +func (d DeleteRes) PhyCount() uint64 { + return d.phyCount +} + +// PhySize returns the size of removed physical objects. +func (d DeleteRes) PhySize() uint64 { + return d.phySize +} + +// LogicSize returns the size of removed logical objects. +func (d DeleteRes) LogicSize() uint64 { + return d.logicSize +} + +func (s *ObjectStore) Delete(ctx context.Context, address oid.Address) (DeleteRes, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/exists.go b/pkg/local_object_storage/objectstore/exists.go new file mode 100644 index 000000000..8da6cf164 --- /dev/null +++ b/pkg/local_object_storage/objectstore/exists.go @@ -0,0 +1,22 @@ +package objectstore + +import ( + "context" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type ExistsPrm struct { + // Exists option to set object checked for existence. + Address oid.Address + // Exists option to set parent object checked for existence. + ECParentAddress oid.Address +} + +type ExistsRes struct { + Exists, Locked bool +} + +func (s *ObjectStore) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/flush.go b/pkg/local_object_storage/objectstore/flush.go new file mode 100644 index 000000000..366abb907 --- /dev/null +++ b/pkg/local_object_storage/objectstore/flush.go @@ -0,0 +1,17 @@ +package objectstore + +import "context" + +func (s *ObjectStore) Flush(ctx context.Context, ignoreErrors, seal bool) error { + panic("unimplemented") +} + +type SealPrm struct { + IgnoreErrors bool + RestoreMode bool + Shrink bool +} + +func (s *ObjectStore) Seal(context.Context, SealPrm) error { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/garbage.go b/pkg/local_object_storage/objectstore/garbage.go new file mode 100644 index 000000000..70d1ff7be --- /dev/null +++ b/pkg/local_object_storage/objectstore/garbage.go @@ -0,0 +1,119 @@ +package objectstore + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type GarbageObject struct { + addr oid.Address +} + +// Address returns garbage object address. +func (g GarbageObject) Address() oid.Address { + return g.addr +} + +// GarbageHandler is a GarbageObject handling function. +type GarbageHandler func(GarbageObject) error + +type GarbageIterationPrm struct { + h GarbageHandler +} + +// SetHandler sets a handler that will be called on every +// GarbageObject. +func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) { + g.h = h +} + +func (s *ObjectStore) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) error { + panic("unimplemented") +} + +type TombstonedObject struct { + addr oid.Address + tomb oid.Address +} + +// Address returns tombstoned object address. +func (g TombstonedObject) Address() oid.Address { + return g.addr +} + +// Tombstone returns address of a tombstone that +// covers object. +func (g TombstonedObject) Tombstone() oid.Address { + return g.tomb +} + +// TombstonedHandler is a TombstonedObject handling function. +type TombstonedHandler func(object TombstonedObject) error + +// GraveyardIterationPrm groups parameters of the graveyard +// iteration process. +type GraveyardIterationPrm struct { + h TombstonedHandler +} + +// SetHandler sets a handler that will be called on every +// TombstonedObject. +func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) { + g.h = h +} + +// IterateOverGraveyard iterates over all graves in DB. +// +// If h returns ErrInterruptIterator, nil returns immediately. +// Returns other errors of h directly. +func (s *ObjectStore) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm) error { + panic("unimplemented") +} + +type ExpiredObject struct { + typ objectSDK.Type + + addr oid.Address +} + +// Type returns type of the expired object. +func (e *ExpiredObject) Type() objectSDK.Type { + return e.typ +} + +// Address returns address of the expired object. +func (e *ExpiredObject) Address() oid.Address { + return e.addr +} + +// ExpiredObjectHandler is an ExpiredObject handling function. +type ExpiredObjectHandler func(*ExpiredObject) error + +// ErrInterruptIterator is returned by iteration handlers +// as a "break" keyword. +var ErrInterruptIterator = logicerr.New("iterator is interrupted") + +// IterateExpired iterates over all objects in DB which are out of date +// relative to epoch. Locked objects are not included (do not confuse +// with objects of type LOCK). +// +// If h returns ErrInterruptIterator, nil returns immediately. +// Returns other errors of h directly. +func (s *ObjectStore) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error { + panic("unimplemented") +} + +func (s *ObjectStore) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) { + panic("unimplemented") +} + +// InhumeTombstones deletes tombstoned objects from the +// graveyard bucket. +// +// Returns any error appeared during deletion process. +func (s *ObjectStore) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/get.go b/pkg/local_object_storage/objectstore/get.go new file mode 100644 index 000000000..9adcc0d1c --- /dev/null +++ b/pkg/local_object_storage/objectstore/get.go @@ -0,0 +1,12 @@ +package objectstore + +import ( + "context" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (s *ObjectStore) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/get_range.go b/pkg/local_object_storage/objectstore/get_range.go new file mode 100644 index 000000000..4e88803e7 --- /dev/null +++ b/pkg/local_object_storage/objectstore/get_range.go @@ -0,0 +1,12 @@ +package objectstore + +import ( + "context" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (s *ObjectStore) GetRange(ctx context.Context, address oid.Address, offset, limit uint64) (*objectSDK.Object, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/head.go b/pkg/local_object_storage/objectstore/head.go new file mode 100644 index 000000000..944e4b87a --- /dev/null +++ b/pkg/local_object_storage/objectstore/head.go @@ -0,0 +1,17 @@ +package objectstore + +import ( + "context" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type HeadPrm struct { + Address oid.Address + Raw bool +} + +func (s *ObjectStore) Head(ctx context.Context, prm HeadPrm) (*objectSDK.Object, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/id.go b/pkg/local_object_storage/objectstore/id.go new file mode 100644 index 000000000..a714132c9 --- /dev/null +++ b/pkg/local_object_storage/objectstore/id.go @@ -0,0 +1,15 @@ +package objectstore + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" +) + +func (s *ObjectStore) GetShardID(ctx context.Context) ([]byte, error) { + panic("unimplemented") +} + +func (s *ObjectStore) SetShardID(ctx context.Context, id []byte, mode mode.Mode) error { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/info.go b/pkg/local_object_storage/objectstore/info.go new file mode 100644 index 000000000..9061a9eff --- /dev/null +++ b/pkg/local_object_storage/objectstore/info.go @@ -0,0 +1,7 @@ +package objectstore + +type Info struct { + WALPath string + BlobPath string + MetaPath string +} diff --git a/pkg/local_object_storage/objectstore/inhume.go b/pkg/local_object_storage/objectstore/inhume.go new file mode 100644 index 000000000..9bdb0db2c --- /dev/null +++ b/pkg/local_object_storage/objectstore/inhume.go @@ -0,0 +1,152 @@ +package objectstore + +import ( + "context" + "fmt" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +// InhumePrm encapsulates parameters for Inhume operation. +type InhumePrm struct { + tomb *oid.Address + + target []oid.Address + + lockObjectHandling bool + + forceRemoval bool +} + +// DeletionInfo contains details on deleted object. +type DeletionInfo struct { + Size uint64 + CID cid.ID + IsUser bool +} + +// InhumeRes encapsulates results of Inhume operation. +type InhumeRes struct { + deletedLockObj []oid.Address + logicInhumed uint64 + userInhumed uint64 + inhumedByCnrID map[cid.ID]ObjectCounters + deletionDetails []DeletionInfo +} + +// LogicInhumed return number of logic object +// that have been inhumed. +func (i InhumeRes) LogicInhumed() uint64 { + return i.logicInhumed +} + +func (i InhumeRes) UserInhumed() uint64 { + return i.userInhumed +} + +// InhumedByCnrID return number of object +// that have been inhumed by container ID. +func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters { + return i.inhumedByCnrID +} + +// DeletedLockObjects returns deleted object of LOCK +// type. Returns always nil if WithoutLockObjectHandling +// was provided to the InhumePrm. +func (i InhumeRes) DeletedLockObjects() []oid.Address { + return i.deletedLockObj +} + +// GetDeletionInfoLength returns amount of stored elements +// in deleted sizes array. +func (i InhumeRes) GetDeletionInfoLength() int { + return len(i.deletionDetails) +} + +// GetDeletionInfoByIndex returns both deleted object sizes and +// associated container ID by index. +func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo { + return i.deletionDetails[target] +} + +// StoreDeletionInfo stores size of deleted object and associated container ID +// in corresponding arrays. +func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) { + i.deletionDetails = append(i.deletionDetails, DeletionInfo{ + Size: deletedSize, + CID: containerID, + IsUser: isUser, + }) + i.logicInhumed++ + if isUser { + i.userInhumed++ + } + + if v, ok := i.inhumedByCnrID[containerID]; ok { + v.Logic++ + if isUser { + v.User++ + } + i.inhumedByCnrID[containerID] = v + } else { + v = ObjectCounters{ + Logic: 1, + } + if isUser { + v.User = 1 + } + i.inhumedByCnrID[containerID] = v + } +} + +// SetAddresses sets a list of object addresses that should be inhumed. +func (p *InhumePrm) SetAddresses(addrs ...oid.Address) { + p.target = addrs +} + +// SetTombstoneAddress sets tombstone address as the reason for inhume operation. +// +// addr should not be nil. +// Should not be called along with SetGCMark. +func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) { + p.tomb = &addr +} + +// SetGCMark marks the object to be physically removed. +// +// Should not be called along with SetTombstoneAddress. +func (p *InhumePrm) SetGCMark() { + p.tomb = nil +} + +// SetLockObjectHandling checks if there were +// any LOCK object among the targets set via WithAddresses. +func (p *InhumePrm) SetLockObjectHandling() { + p.lockObjectHandling = true +} + +// SetForceGCMark allows removal any object. Expected to be +// called only in control service. +func (p *InhumePrm) SetForceGCMark() { + p.tomb = nil + p.forceRemoval = true +} + +func (p *InhumePrm) validate() error { + if p == nil { + return nil + } + if p.tomb != nil { + for _, addr := range p.target { + if addr.Container() != p.tomb.Container() { + return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb) + } + } + } + return nil +} + +func (s *ObjectStore) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/iterate.go b/pkg/local_object_storage/objectstore/iterate.go new file mode 100644 index 000000000..774a8cc25 --- /dev/null +++ b/pkg/local_object_storage/objectstore/iterate.go @@ -0,0 +1,37 @@ +package objectstore + +import ( + "context" + + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +func (s *ObjectStore) IterateOverContainers(ctx context.Context, handler func(context.Context, objectSDK.Type, cid.ID) error) error { + panic("unimplemented") +} + +type IterateOverObjectsInContainerPrm struct { + // ObjectType type of objects to iterate over. + ObjectType objectSDK.Type + // ContainerID container for objects to iterate over. + ContainerID cid.ID + // Handler function executed upon objects in db. + Handler func(context.Context, *objectcore.Info) error +} + +func (s *ObjectStore) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + panic("unimplemented") +} + +type CountAliveObjectsInContainerPrm struct { + // ObjectType type of objects to iterate over. + ObjectType objectSDK.Type + // ContainerID container for objects to iterate over. + ContainerID cid.ID +} + +func (s *ObjectStore) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/list.go b/pkg/local_object_storage/objectstore/list.go new file mode 100644 index 000000000..3835e310e --- /dev/null +++ b/pkg/local_object_storage/objectstore/list.go @@ -0,0 +1,26 @@ +package objectstore + +import ( + "context" + + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" +) + +type Cursor struct { + bucketName []byte + inBucketOffset []byte +} + +type ListPrm struct { + Count int + Cursor *Cursor +} + +type ListRes struct { + AddressList []objectcore.Info + Cursor *Cursor +} + +func (s *ObjectStore) ListWithCursor(context.Context, ListPrm) (ListRes, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/lock.go b/pkg/local_object_storage/objectstore/lock.go new file mode 100644 index 000000000..787647856 --- /dev/null +++ b/pkg/local_object_storage/objectstore/lock.go @@ -0,0 +1,24 @@ +package objectstore + +import ( + "context" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func (s *ObjectStore) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error { + panic("unimplemented") +} + +func (s *ObjectStore) IsLocked(ctx context.Context, addr oid.Address) (bool, error) { + panic("unimplemented") +} + +func (s *ObjectStore) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error) { + panic("unimplemented") +} + +func (s *ObjectStore) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/mode.go b/pkg/local_object_storage/objectstore/mode.go new file mode 100644 index 000000000..9432a8bbe --- /dev/null +++ b/pkg/local_object_storage/objectstore/mode.go @@ -0,0 +1,11 @@ +package objectstore + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" +) + +func (s *ObjectStore) SetMode(ctx context.Context, m mode.Mode) error { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/put.go b/pkg/local_object_storage/objectstore/put.go new file mode 100644 index 000000000..bd4eccfa7 --- /dev/null +++ b/pkg/local_object_storage/objectstore/put.go @@ -0,0 +1,11 @@ +package objectstore + +import ( + "context" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +func (s *ObjectStore) Put(ctx context.Context, object *objectSDK.Object) error { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/rebuild.go b/pkg/local_object_storage/objectstore/rebuild.go new file mode 100644 index 000000000..0a919cd28 --- /dev/null +++ b/pkg/local_object_storage/objectstore/rebuild.go @@ -0,0 +1,7 @@ +package objectstore + +import "context" + +func (s *ObjectStore) Rebuild(ctx context.Context) error { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/select.go b/pkg/local_object_storage/objectstore/select.go new file mode 100644 index 000000000..53d0092fc --- /dev/null +++ b/pkg/local_object_storage/objectstore/select.go @@ -0,0 +1,23 @@ +package objectstore + +import ( + "context" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type SelectPrm struct { + Container cid.ID + Filters objectSDK.SearchFilters + UseAttributeIndex bool +} + +type SelectRes struct { + AddressList []oid.Address +} + +func (s *ObjectStore) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) { + panic("unimplemented") +} diff --git a/pkg/local_object_storage/objectstore/store.go b/pkg/local_object_storage/objectstore/store.go index 579ca179a..c94f15071 100644 --- a/pkg/local_object_storage/objectstore/store.go +++ b/pkg/local_object_storage/objectstore/store.go @@ -1,11 +1,32 @@ package objectstore -type ObjectStore struct{} +import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" -func New(opts ...Option) (*ObjectStore, error) { +type ObjectStore struct { + cfg *config +} + +func New(opts ...Option) *ObjectStore { cfg := defaultCfg() for _, opt := range opts { opt(cfg) } - return &ObjectStore{}, nil + return &ObjectStore{ + cfg: cfg, + } +} + +func (s *ObjectStore) Info() Info { + return Info{ + WALPath: s.cfg.walPath, + BlobPath: s.cfg.blobPath, + MetaPath: s.cfg.metaPath, + } +} + +func (s *ObjectStore) SetLogger(l *logger.Logger) { + s.cfg.logger = l +} + +func (s *ObjectStore) SetParentID(parentID string) { } diff --git a/pkg/local_object_storage/shard/container.go b/pkg/local_object_storage/shard/container.go index 0309f0c81..d93883c46 100644 --- a/pkg/local_object_storage/shard/container.go +++ b/pkg/local_object_storage/shard/container.go @@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 { return r.size } -func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) { +func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) { s.m.RLock() defer s.m.RUnlock() @@ -34,7 +34,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) { return ContainerSizeRes{}, ErrDegradedMode } - size, err := s.metaBase.ContainerSize(prm.cnr) + size, err := s.objectstore.ContainerSize(ctx, prm.cnr) if err != nil { return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err) } @@ -69,7 +69,7 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont return ContainerCountRes{}, ErrDegradedMode } - counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID) + counters, err := s.objectstore.ContainerCount(ctx, prm.ContainerID) if err != nil { return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err) } @@ -100,7 +100,7 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { return ErrDegradedMode } - return s.metaBase.DeleteContainerSize(ctx, id) + return s.objectstore.DeleteContainerSize(ctx, id) } func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error { @@ -122,5 +122,5 @@ func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error { return ErrDegradedMode } - return s.metaBase.DeleteContainerCount(ctx, id) + return s.objectstore.DeleteContainerCount(ctx, id) } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index fedde2206..672b7bd36 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -2,22 +2,12 @@ package shard import ( "context" - "errors" "fmt" - "slices" - "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error { @@ -45,59 +35,21 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err // Open opens all Shard's components. func (s *Shard) Open(ctx context.Context) error { - components := []interface { - Open(context.Context, mode.Mode) error - }{ - s.blobStor, - } m := s.GetMode() - if !m.NoMetabase() { - components = append(components, s.metaBase) - } - - if s.hasWriteCache() && !m.NoMetabase() { - components = append(components, s.writeCache) + if err := s.objectstore.Open(ctx, m); err != nil { + return err } if s.pilorama != nil { - components = append(components, s.pilorama) - } - - for i, component := range components { - if err := component.Open(ctx, m); err != nil { - if component == s.metaBase { - // We must first open all other components to avoid - // opening non-existent DB in read-only mode. - for j := i + 1; j < len(components); j++ { - if err := components[j].Open(ctx, m); err != nil { - // Other components must be opened, fail. - return fmt.Errorf("open %T: %w", components[j], err) - } - } - err = s.handleMetabaseFailure(ctx, "open", err) - if err != nil { - return err - } - - break - } - - return fmt.Errorf("open %T: %w", component, err) + if err := s.pilorama.Open(ctx, m); err != nil { + return err } } + return nil } -type metabaseSynchronizer Shard - -func (x *metabaseSynchronizer) Init(ctx context.Context) error { - ctx, span := tracing.StartSpanFromContext(ctx, "metabaseSynchronizer.Init") - defer span.End() - - return (*Shard)(x).refillMetabase(ctx) -} - // Init initializes all Shard's components. func (s *Shard) Init(ctx context.Context) error { m := s.GetMode() @@ -132,234 +84,23 @@ func (s *Shard) Init(ctx context.Context) error { s.rb = newRebuilder() if !m.NoMetabase() { - s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) + s.rb.Start(ctx, s.objectstore, s.log) } s.writecacheSealCancel.Store(dummyCancel) return nil } func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error { - type initializer interface { - Init(context.Context) error - } - - var components []initializer - - if !m.NoMetabase() { - var initMetabase initializer - - if s.NeedRefillMetabase() { - initMetabase = (*metabaseSynchronizer)(s) - } else { - initMetabase = s.metaBase - } - - components = []initializer{ - s.blobStor, initMetabase, - } - } else { - components = []initializer{s.blobStor} - } - - if s.hasWriteCache() && !m.NoMetabase() { - components = append(components, s.writeCache) + if err := s.objectstore.Init(ctx); err != nil { + return err } if s.pilorama != nil { - components = append(components, s.pilorama) - } - - for _, component := range components { - if err := component.Init(ctx); err != nil { - if component == s.metaBase { - if errors.Is(err, meta.ErrOutdatedVersion) || errors.Is(err, meta.ErrIncompletedUpgrade) { - return fmt.Errorf("metabase initialization: %w", err) - } - - err = s.handleMetabaseFailure(ctx, "init", err) - if err != nil { - return err - } - - break - } - - return fmt.Errorf("initialize %T: %w", component, err) - } - } - return nil -} - -func (s *Shard) refillMetabase(ctx context.Context) error { - path := s.metaBase.DumpInfo().Path - s.metricsWriter.SetRefillStatus(path, "running") - s.metricsWriter.SetRefillPercent(path, 0) - var success bool - defer func() { - if success { - s.metricsWriter.SetRefillStatus(path, "completed") - } else { - s.metricsWriter.SetRefillStatus(path, "failed") - } - }() - - err := s.metaBase.Reset() - if err != nil { - return fmt.Errorf("reset metabase: %w", err) - } - - withCount := true - totalObjects, err := s.blobStor.ObjectsCount(ctx) - if err != nil { - s.log.Warn(ctx, logs.EngineRefillFailedToGetObjectsCount, zap.Error(err)) - withCount = false - } - - eg, egCtx := errgroup.WithContext(ctx) - if s.cfg.refillMetabaseWorkersCount > 0 { - eg.SetLimit(s.cfg.refillMetabaseWorkersCount) - } - - var completedCount uint64 - var metricGuard sync.Mutex - itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { - eg.Go(func() error { - var success bool - defer func() { - s.metricsWriter.IncRefillObjectsCount(path, len(data), success) - if withCount { - metricGuard.Lock() - completedCount++ - s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects)) - metricGuard.Unlock() - } - }() - - if err := s.refillObject(egCtx, data, addr, descriptor); err != nil { - return err - } - success = true - return nil - }) - - select { - case <-egCtx.Done(): - return egCtx.Err() - default: - return nil - } - }) - - egErr := eg.Wait() - - err = errors.Join(egErr, itErr) - if err != nil { - return fmt.Errorf("put objects to the meta: %w", err) - } - - err = s.metaBase.SyncCounters() - if err != nil { - return fmt.Errorf("sync object counters: %w", err) - } - - success = true - s.metricsWriter.SetRefillPercent(path, 100) - return nil -} - -func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error { - obj := objectSDK.New() - if err := obj.Unmarshal(data); err != nil { - s.log.Warn(ctx, logs.ShardCouldNotUnmarshalObject, - zap.Stringer("address", addr), - zap.Error(err)) - return nil - } - - hasIndexedAttribute := slices.IndexFunc(obj.Attributes(), func(attr objectSDK.Attribute) bool { return meta.IsAtrributeIndexed(attr.Key()) }) > 0 - - var isIndexedContainer bool - if hasIndexedAttribute { - info, err := s.containerInfo.Info(ctx, addr.Container()) - if err != nil { + if err := s.pilorama.Init(ctx); err != nil { return err } - if info.Removed { - s.log.Debug(ctx, logs.ShardSkipObjectFromResyncContainerDeleted, zap.Stringer("address", addr)) - return nil - } - isIndexedContainer = info.Indexed } - var err error - switch obj.Type() { - case objectSDK.TypeTombstone: - err = s.refillTombstoneObject(ctx, obj) - case objectSDK.TypeLock: - err = s.refillLockObject(ctx, obj) - default: - } - if err != nil { - return err - } - - var mPrm meta.PutPrm - mPrm.SetObject(obj) - mPrm.SetStorageID(descriptor) - mPrm.SetIndexAttributes(hasIndexedAttribute && isIndexedContainer) - - _, err = s.metaBase.Put(ctx, mPrm) - if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { - return err - } - return nil -} - -func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error { - var lock objectSDK.Lock - if err := lock.Unmarshal(obj.Payload()); err != nil { - return fmt.Errorf("unmarshal lock content: %w", err) - } - - locked := make([]oid.ID, lock.NumberOfMembers()) - lock.ReadMembers(locked) - - cnr, _ := obj.ContainerID() - id, _ := obj.ID() - err := s.metaBase.Lock(ctx, cnr, id, locked) - if err != nil { - return fmt.Errorf("lock objects: %w", err) - } - return nil -} - -func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error { - tombstone := objectSDK.NewTombstone() - - if err := tombstone.Unmarshal(obj.Payload()); err != nil { - return fmt.Errorf("unmarshal tombstone content: %w", err) - } - - tombAddr := object.AddressOf(obj) - memberIDs := tombstone.Members() - tombMembers := make([]oid.Address, 0, len(memberIDs)) - - for i := range memberIDs { - a := tombAddr - a.SetObject(memberIDs[i]) - - tombMembers = append(tombMembers, a) - } - - var inhumePrm meta.InhumePrm - - inhumePrm.SetTombstoneAddress(tombAddr) - inhumePrm.SetAddresses(tombMembers...) - - _, err := s.metaBase.Inhume(ctx, inhumePrm) - if err != nil { - return fmt.Errorf("inhume objects: %w", err) - } return nil } @@ -368,34 +109,28 @@ func (s *Shard) Close(ctx context.Context) error { if s.rb != nil { s.rb.Stop(ctx, s.log) } - var components []interface{ Close(context.Context) error } if s.pilorama != nil { - components = append(components, s.pilorama) - } - - if s.hasWriteCache() { - prev := s.writecacheSealCancel.Swap(notInitializedCancel) - prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex - components = append(components, s.writeCache) - } - - components = append(components, s.blobStor, s.metaBase) - - var lastErr error - for _, component := range components { - if err := component.Close(ctx); err != nil { - lastErr = err + if err := s.pilorama.Close(ctx); err != nil { s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err)) + return err } } + prev := s.writecacheSealCancel.Swap(notInitializedCancel) + prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex + + if err := s.objectstore.Close(ctx); err != nil { + s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err)) + return err + } + // If Init/Open was unsuccessful gc can be nil. if s.gc != nil { s.gc.stop(ctx) } - return lastErr + return nil } // Reload reloads configuration portions that are necessary. @@ -417,34 +152,9 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error { s.rb.Stop(ctx, s.log) if !s.info.Mode.NoMetabase() { defer func() { - s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) + s.rb.Start(ctx, s.objectstore, s.log) }() } - - ok, err := s.metaBase.Reload(ctx, c.metaOpts...) - if err != nil { - if errors.Is(err, meta.ErrDegradedMode) { - s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err)) - _ = s.setMode(ctx, mode.DegradedReadOnly) - } - return err - } - if ok { - var err error - if c.refillMetabase { - // Here we refill metabase only if a new instance was opened. This is a feature, - // we don't want to hang for some time just because we forgot to change - // config after the node was updated. - err = s.refillMetabase(ctx) - } else { - err = s.metaBase.Init(ctx) - } - if err != nil { - s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err)) - _ = s.setMode(ctx, mode.DegradedReadOnly) - return err - } - } return s.setMode(ctx, c.info.Mode) } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 6d2cd7137..dc00b9126 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -2,7 +2,6 @@ package shard import ( "context" - "fmt" "io/fs" "math" "os" @@ -10,7 +9,6 @@ import ( "sync/atomic" "testing" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" @@ -19,14 +17,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" - objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" ) @@ -108,297 +100,3 @@ func TestShardOpen(t *testing.T) { require.Equal(t, mode.DegradedReadOnly, sh.GetMode()) require.NoError(t, sh.Close(context.Background())) } - -func TestRefillMetabaseCorrupted(t *testing.T) { - t.Parallel() - - dir := t.TempDir() - - fsTree := fstree.New( - fstree.WithDirNameLen(2), - fstree.WithPath(filepath.Join(dir, "blob")), - fstree.WithDepth(1)) - blobOpts := []blobstor.Option{ - blobstor.WithStorages([]blobstor.SubStorage{ - { - Storage: fsTree, - }, - }), - } - - mm := newMetricStore() - - sh := New( - WithID(NewIDFromBytes([]byte{})), - WithBlobStorOptions(blobOpts...), - WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), - WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})), - WithMetricsWriter(mm), - ) - require.NoError(t, sh.Open(context.Background())) - require.NoError(t, sh.Init(context.Background())) - - obj := objecttest.Object() - obj.SetType(objectSDK.TypeRegular) - obj.SetPayload([]byte{0, 1, 2, 3, 4, 5}) - - var putPrm PutPrm - putPrm.SetObject(obj) - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - require.NoError(t, sh.Close(context.Background())) - - addr := object.AddressOf(obj) - // This is copied from `fstree.treePath()` to avoid exporting function just for tests. - { - saddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString() - p := fmt.Sprintf("%s/%s/%s", fsTree.RootPath, saddr[:2], saddr[2:]) - require.NoError(t, os.WriteFile(p, []byte("not an object"), fsTree.Permissions)) - } - - sh = New( - WithID(NewIDFromBytes([]byte{})), - WithBlobStorOptions(blobOpts...), - WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), - WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), - WithRefillMetabase(true), - WithMetricsWriter(mm)) - require.NoError(t, sh.Open(context.Background())) - require.NoError(t, sh.Init(context.Background())) - - var getPrm GetPrm - getPrm.SetAddress(addr) - _, err = sh.Get(context.Background(), getPrm) - require.True(t, client.IsErrObjectNotFound(err)) - require.NoError(t, sh.Close(context.Background())) -} - -func TestRefillMetabase(t *testing.T) { - t.Parallel() - - p := t.Name() - - defer os.RemoveAll(p) - - blobOpts := []blobstor.Option{ - blobstor.WithStorages([]blobstor.SubStorage{ - { - Storage: fstree.New( - fstree.WithPath(filepath.Join(p, "blob")), - fstree.WithDepth(1)), - }, - }), - } - - mm := newMetricStore() - - sh := New( - WithID(NewIDFromBytes([]byte{})), - WithBlobStorOptions(blobOpts...), - WithMetaBaseOptions( - meta.WithPath(filepath.Join(p, "meta")), - meta.WithEpochState(epochState{}), - ), - WithPiloramaOptions( - pilorama.WithPath(filepath.Join(p, "pilorama"))), - WithMetricsWriter(mm), - ) - - // open Blobstor - require.NoError(t, sh.Open(context.Background())) - - // initialize Blobstor - require.NoError(t, sh.Init(context.Background())) - - const objNum = 5 - - mObjs := make(map[string]objAddr) - locked := make([]oid.ID, 1, 2) - locked[0] = oidtest.ID() - cnrLocked := cidtest.ID() - for range objNum { - obj := objecttest.Object() - obj.SetType(objectSDK.TypeRegular) - - if len(locked) < 2 { - obj.SetContainerID(cnrLocked) - id, _ := obj.ID() - locked = append(locked, id) - } - - addr := object.AddressOf(obj) - - mObjs[addr.EncodeToString()] = objAddr{ - obj: obj, - addr: addr, - } - } - - tombObj := objecttest.Object() - tombObj.SetType(objectSDK.TypeTombstone) - - tombstone := objecttest.Tombstone() - - tombData, err := tombstone.Marshal() - require.NoError(t, err) - - tombObj.SetPayload(tombData) - - tombMembers := make([]oid.Address, 0, len(tombstone.Members())) - - members := tombstone.Members() - for i := range tombstone.Members() { - var a oid.Address - a.SetObject(members[i]) - cnr, _ := tombObj.ContainerID() - a.SetContainer(cnr) - - tombMembers = append(tombMembers, a) - } - - var putPrm PutPrm - - for _, v := range mObjs { - putPrm.SetObject(v.obj) - - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - } - - putPrm.SetObject(tombObj) - - _, err = sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - // LOCK object handling - var lock objectSDK.Lock - lock.WriteMembers(locked) - - lockObj := objecttest.Object() - lockObj.SetContainerID(cnrLocked) - objectSDK.WriteLock(lockObj, lock) - - putPrm.SetObject(lockObj) - _, err = sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - lockID, _ := lockObj.ID() - require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked)) - - var inhumePrm InhumePrm - inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...) - - _, err = sh.Inhume(context.Background(), inhumePrm) - require.NoError(t, err) - - var headPrm HeadPrm - - checkObj := func(addr oid.Address, expObj *objectSDK.Object) { - headPrm.SetAddress(addr) - - res, err := sh.Head(context.Background(), headPrm) - - if expObj == nil { - require.True(t, client.IsErrObjectNotFound(err)) - return - } - - require.NoError(t, err) - require.Equal(t, expObj.CutPayload(), res.Object()) - } - - checkAllObjs := func(exists bool) { - for _, v := range mObjs { - if exists { - checkObj(v.addr, v.obj) - } else { - checkObj(v.addr, nil) - } - } - } - - checkTombMembers := func(exists bool) { - for _, member := range tombMembers { - headPrm.SetAddress(member) - - _, err := sh.Head(context.Background(), headPrm) - - if exists { - require.True(t, client.IsErrObjectAlreadyRemoved(err)) - } else { - require.True(t, client.IsErrObjectNotFound(err)) - } - } - } - - checkLocked := func(t *testing.T, cnr cid.ID, locked []oid.ID) { - var addr oid.Address - addr.SetContainer(cnr) - - for i := range locked { - addr.SetObject(locked[i]) - - var prm InhumePrm - prm.MarkAsGarbage(addr) - - var target *apistatus.ObjectLocked - _, err := sh.Inhume(context.Background(), prm) - require.ErrorAs(t, err, &target, "object %s should be locked", locked[i]) - } - } - - checkAllObjs(true) - checkObj(object.AddressOf(tombObj), tombObj) - checkTombMembers(true) - checkLocked(t, cnrLocked, locked) - - c, err := sh.metaBase.ObjectCounters() - require.NoError(t, err) - - phyBefore := c.Phy - logicalBefore := c.Logic - - err = sh.Close(context.Background()) - require.NoError(t, err) - - sh = New( - WithID(NewIDFromBytes([]byte{})), - WithBlobStorOptions(blobOpts...), - WithMetaBaseOptions( - meta.WithPath(filepath.Join(p, "meta_restored")), - meta.WithEpochState(epochState{}), - ), - WithPiloramaOptions( - pilorama.WithPath(filepath.Join(p, "pilorama_another"))), - WithMetricsWriter(mm), - ) - - // open Blobstor - require.NoError(t, sh.Open(context.Background())) - - // initialize Blobstor - require.NoError(t, sh.Init(context.Background())) - - defer sh.Close(context.Background()) - - checkAllObjs(false) - checkObj(object.AddressOf(tombObj), nil) - checkTombMembers(false) - - err = sh.refillMetabase(context.Background()) - require.NoError(t, err) - - c, err = sh.metaBase.ObjectCounters() - require.NoError(t, err) - - require.Equal(t, phyBefore, c.Phy) - require.Equal(t, logicalBefore, c.Logic) - - checkAllObjs(true) - checkObj(object.AddressOf(tombObj), tombObj) - checkTombMembers(true) - checkLocked(t, cnrLocked, locked) - require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb - require.Equal(t, "completed", mm.refillStatus) - require.Equal(t, uint32(100), mm.refillPercent) -} diff --git a/pkg/local_object_storage/shard/count.go b/pkg/local_object_storage/shard/count.go index b3bc6a30b..b6a05e545 100644 --- a/pkg/local_object_storage/shard/count.go +++ b/pkg/local_object_storage/shard/count.go @@ -23,7 +23,7 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) { return 0, ErrDegradedMode } - cc, err := s.metaBase.ObjectCounters() + cc, err := s.objectstore.ObjectCounters(ctx) if err != nil { return 0, err } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 55231b032..9a147c593 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -2,17 +2,12 @@ package shard import ( "context" - "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. @@ -62,21 +57,7 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del default: } - if err := s.validateWritecacheDoesntContainObject(ctx, addr); err != nil { - if skipFailed { - continue - } - return result, err - } - - if err := s.deleteFromBlobstor(ctx, addr); err != nil { - if skipFailed { - continue - } - return result, err - } - - if err := s.deleteFromMetabase(ctx, addr); err != nil { + if err := s.deleteFromObjectStore(ctx, addr); err != nil { if skipFailed { continue } @@ -88,59 +69,11 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del return result, nil } -func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr oid.Address) error { - if !s.hasWriteCache() { - return nil - } - _, err := s.writeCache.Head(ctx, addr) - if err == nil { - s.log.Warn(ctx, logs.ObjectRemovalFailureExistsInWritecache, zap.Stringer("object_address", addr)) - return fmt.Errorf("object %s must be flushed from writecache", addr) - } - if client.IsErrObjectNotFound(err) { - return nil - } - return err -} - -func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error { - var sPrm meta.StorageIDPrm - sPrm.SetAddress(addr) - - res, err := s.metaBase.StorageID(ctx, sPrm) - if err != nil { - s.log.Debug(ctx, logs.StorageIDRetrievalFailure, - zap.Stringer("object", addr), - zap.Error(err)) - return err - } - storageID := res.StorageID() - if storageID == nil { - // if storageID is nil it means: - // 1. there is no such object - // 2. object stored by writecache: should not happen, as `validateWritecacheDoesntContainObject` called before `deleteFromBlobstor` - return nil - } - - var delPrm common.DeletePrm - delPrm.Address = addr - delPrm.StorageID = storageID - - _, err = s.blobStor.Delete(ctx, delPrm) - if err != nil && !client.IsErrObjectNotFound(err) { - s.log.Debug(ctx, logs.ObjectRemovalFailureBlobStor, - zap.Stringer("object_address", addr), - zap.Error(err)) - return err - } - return nil -} - -func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error { +func (s *Shard) deleteFromObjectStore(ctx context.Context, addr oid.Address) error { var delPrm meta.DeletePrm delPrm.SetAddresses(addr) - res, err := s.metaBase.Delete(ctx, delPrm) + res, err := s.objectstore.Delete(ctx, addr) if err != nil { return err } diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 82ce48dde..a0d16ed79 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -3,8 +3,7 @@ package shard import ( "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -53,10 +52,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { )) defer span.End() - var exists bool - var locked bool - var err error - s.m.RLock() defer s.m.RUnlock() @@ -64,26 +59,17 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { return ExistsRes{}, ErrShardDisabled } else if s.info.EvacuationInProgress { return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } else if s.info.Mode.NoMetabase() { - var p common.ExistsPrm - p.Address = prm.Address - - var res common.ExistsRes - res, err = s.blobStor.Exists(ctx, p) - exists = res.Exists - } else { - var existsPrm meta.ExistsPrm - existsPrm.SetAddress(prm.Address) - existsPrm.SetECParent(prm.ECParentAddress) - - var res meta.ExistsRes - res, err = s.metaBase.Exists(ctx, existsPrm) - exists = res.Exists() - locked = res.Locked() + } + res, err := s.objectstore.Exists(ctx, objectstore.ExistsPrm{ + Address: prm.Address, + ECParentAddress: prm.ECParentAddress, + }) + if err != nil { + return ExistsRes{}, err } return ExistsRes{ - ex: exists, - lc: locked, + ex: res.Exists, + lc: res.Locked, }, err } diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 4a5ec7a71..5006c054b 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -293,8 +294,8 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) { buf := make([]oid.Address, 0, s.rmBatchSize) - var iterPrm meta.GarbageIterationPrm - iterPrm.SetHandler(func(g meta.GarbageObject) error { + var iterPrm objectstore.GarbageIterationPrm + iterPrm.SetHandler(func(g objectstore.GarbageObject) error { select { case <-ctx.Done(): return ctx.Err() @@ -312,7 +313,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) { // iterate over metabase's objects with GC mark // (no more than s.rmBatchSize objects) - err := s.metaBase.IterateOverGarbage(ctx, iterPrm) + err := s.objectstore.IterateOverGarbage(ctx, iterPrm) if err != nil { s.log.Warn(ctx, logs.ShardIteratorOverMetabaseGraveyardFailed, zap.Error(err), @@ -368,7 +369,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { + expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *objectstore.ExpiredObject) { if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { batch = append(batch, o.Address()) @@ -422,13 +423,13 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) return } - var inhumePrm meta.InhumePrm + var inhumePrm objectstore.InhumePrm inhumePrm.SetAddresses(expired...) inhumePrm.SetGCMark() // inhume the collected objects - res, err := s.metaBase.Inhume(ctx, inhumePrm) + res, err := s.objectstore.Inhume(ctx, inhumePrm) if err != nil { s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err), @@ -452,7 +453,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) { result := make([]oid.Address, 0, len(source)) - parentToChildren, err := s.metaBase.GetChildren(ctx, source) + parentToChildren, err := s.objectstore.GetChildren(ctx, source) if err != nil { return nil, err } @@ -479,11 +480,11 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { defer log.Debug(ctx, logs.ShardFinishedExpiredTombstonesHandling) const tssDeleteBatch = 50 - tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) - tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch) + tss := make([]objectstore.TombstonedObject, 0, tssDeleteBatch) + tssExp := make([]objectstore.TombstonedObject, 0, tssDeleteBatch) - var iterPrm meta.GraveyardIterationPrm - iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error { + var iterPrm objectstore.GraveyardIterationPrm + iterPrm.SetHandler(func(deletedObject objectstore.TombstonedObject) error { tss = append(tss, deletedObject) if len(tss) == tssDeleteBatch { @@ -505,7 +506,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { return } - err = s.metaBase.IterateOverGraveyard(ctx, iterPrm) + err = s.objectstore.IterateOverGraveyard(ctx, iterPrm) if err != nil { log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err)) s.m.RUnlock() @@ -531,7 +532,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { s.expiredTombstonesCallback(ctx, tssExp) } - iterPrm.SetOffset(tss[tssLen-1].Address()) tss = tss[:0] tssExp = tssExp[:0] } @@ -556,7 +556,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { + expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *objectstore.ExpiredObject) { if o.Type() == objectSDK.TypeLock { batch = append(batch, o.Address()) @@ -590,7 +590,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { } } -func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error { +func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*objectstore.ExpiredObject)) error { s.m.RLock() defer s.m.RUnlock() @@ -598,7 +598,7 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo return ErrDegradedMode } - err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error { + err := s.objectstore.IterateExpired(ctx, epoch, func(expiredObject *objectstore.ExpiredObject) error { select { case <-ctx.Done(): return meta.ErrInterruptIterator @@ -621,14 +621,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid return nil, ErrDegradedMode } - return s.metaBase.FilterExpired(ctx, epoch, addresses) + return s.objectstore.FilterExpired(ctx, epoch, addresses) } // HandleExpiredTombstones marks tombstones themselves as garbage // and clears up corresponding graveyard records. // // Does not modify tss. -func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) { +func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []objectstore.TombstonedObject) { s.m.RLock() defer s.m.RUnlock() @@ -636,7 +636,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston return } - res, err := s.metaBase.InhumeTombstones(ctx, tss) + res, err := s.objectstore.InhumeTombstones(ctx, tss) if err != nil { s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err), @@ -664,7 +664,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] if s.GetMode().NoMetabase() { return } - unlocked, err := s.metaBase.FreeLockedBy(lockers) + unlocked, err := s.objectstore.FreeLockedBy(lockers) if err != nil { s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err), @@ -673,11 +673,11 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] return } - var pInhume meta.InhumePrm + var pInhume objectstore.InhumePrm pInhume.SetAddresses(lockers...) pInhume.SetForceGCMark() - res, err := s.metaBase.Inhume(ctx, pInhume) + res, err := s.objectstore.Inhume(ctx, pInhume) if err != nil { s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err), @@ -721,7 +721,7 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) { return } - _, err := s.metaBase.FreeLockedBy(lockers) + _, err := s.objectstore.FreeLockedBy(lockers) if err != nil { s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err), @@ -750,7 +750,7 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { } func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) { - ids, err := s.metaBase.ZeroSizeContainers(ctx) + ids, err := s.objectstore.ZeroSizeContainers(ctx) if err != nil { s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err)) return @@ -762,7 +762,7 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui } func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) { - ids, err := s.metaBase.ZeroCountContainers(ctx) + ids, err := s.objectstore.ZeroCountContainers(ctx) if err != nil { s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err)) return diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go deleted file mode 100644 index 9998bbae2..000000000 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package shard - -import ( - "context" - "path/filepath" - "testing" - "time" - - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" - "github.com/stretchr/testify/require" -) - -func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { - t.Parallel() - - rootPath := t.TempDir() - - var sh *Shard - l := test.NewLogger(t) - blobOpts := []blobstor.Option{ - blobstor.WithLogger(test.NewLogger(t)), - blobstor.WithStorages([]blobstor.SubStorage{ - { - Storage: blobovniczatree.NewBlobovniczaTree( - context.Background(), - blobovniczatree.WithLogger(test.NewLogger(t)), - blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")), - blobovniczatree.WithBlobovniczaShallowDepth(1), - blobovniczatree.WithBlobovniczaShallowWidth(1)), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return len(data) <= 1<<20 - }, - }, - { - Storage: fstree.New( - fstree.WithPath(filepath.Join(rootPath, "blob"))), - }, - }), - } - - opts := []Option{ - WithID(NewIDFromBytes([]byte{})), - WithLogger(l), - WithBlobStorOptions(blobOpts...), - WithMetaBaseOptions( - meta.WithPath(filepath.Join(rootPath, "meta")), - meta.WithEpochState(epochState{}), - ), - WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))), - WithDeletedLockCallback(func(ctx context.Context, addresses []oid.Address) { - sh.HandleDeletedLocks(ctx, addresses) - }), - WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { - sh.HandleExpiredLocks(ctx, epoch, a) - }), - WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - require.NoError(t, err) - return pool - }), - WithGCRemoverSleepInterval(1 * time.Second), - WithDisabledGC(), - } - - sh = New(opts...) - require.NoError(t, sh.Open(context.Background())) - require.NoError(t, sh.Init(context.Background())) - defer func() { require.NoError(t, sh.Close(context.Background())) }() - - cnr := cidtest.ID() - obj := testutil.GenerateObjectWithCID(cnr) - objID, _ := obj.ID() - var addr oid.Address - addr.SetContainer(cnr) - addr.SetObject(objID) - - var putPrm PutPrm - putPrm.SetObject(obj) - - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - var getPrm GetPrm - getPrm.SetAddress(objectCore.AddressOf(obj)) - _, err = sh.Get(context.Background(), getPrm) - require.NoError(t, err, "failed to get") - - // inhume - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(addr) - _, err = sh.Inhume(context.Background(), inhumePrm) - require.NoError(t, err, "failed to inhume") - _, err = sh.Get(context.Background(), getPrm) - require.Error(t, err, "get returned error") - require.True(t, client.IsErrObjectNotFound(err), "invalid error type") - - // storageID - var metaStIDPrm meta.StorageIDPrm - metaStIDPrm.SetAddress(addr) - storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm) - require.NoError(t, err, "failed to get storage ID") - - // check existence in blobstore - var bsExisted common.ExistsPrm - bsExisted.Address = addr - bsExisted.StorageID = storageID.StorageID() - exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) - require.NoError(t, err, "failed to check blobstore existence") - require.True(t, exRes.Exists, "invalid blobstore existence result") - - // drop from blobstor - var bsDeletePrm common.DeletePrm - bsDeletePrm.Address = addr - bsDeletePrm.StorageID = storageID.StorageID() - _, err = sh.blobStor.Delete(context.Background(), bsDeletePrm) - require.NoError(t, err, "failed to delete from blobstore") - - // check existence in blobstore - exRes, err = sh.blobStor.Exists(context.Background(), bsExisted) - require.NoError(t, err, "failed to check blobstore existence") - require.False(t, exRes.Exists, "invalid blobstore existence result") - - // get should return object not found - _, err = sh.Get(context.Background(), getPrm) - require.Error(t, err, "get returned no error") - require.True(t, client.IsErrObjectNotFound(err), "invalid error type") -} diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go deleted file mode 100644 index e3670b441..000000000 --- a/pkg/local_object_storage/shard/gc_test.go +++ /dev/null @@ -1,295 +0,0 @@ -package shard - -import ( - "context" - "errors" - "testing" - - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" - "github.com/stretchr/testify/require" -) - -func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { - t.Parallel() - - epoch := &epochState{ - Value: 100, - } - - sh := newCustomShard(t, false, shardOptions{ - metaOptions: []meta.Option{meta.WithEpochState(epoch)}, - additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool { - return util.NewPseudoWorkerPool() // synchronous event processing - })}, - }) - defer func() { require.NoError(t, sh.Close(context.Background())) }() - - cnr := cidtest.ID() - - var objExpirationAttr objectSDK.Attribute - objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch) - objExpirationAttr.SetValue("101") - - obj := testutil.GenerateObjectWithCID(cnr) - obj.SetAttributes(objExpirationAttr) - objID, _ := obj.ID() - - var lockExpirationAttr objectSDK.Attribute - lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch) - lockExpirationAttr.SetValue("103") - - lock := testutil.GenerateObjectWithCID(cnr) - lock.SetType(objectSDK.TypeLock) - lock.SetAttributes(lockExpirationAttr) - lockID, _ := lock.ID() - - var putPrm PutPrm - putPrm.SetObject(obj) - - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID}) - require.NoError(t, err) - - putPrm.SetObject(lock) - _, err = sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - epoch.Value = 105 - sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) - - var getPrm GetPrm - getPrm.SetAddress(objectCore.AddressOf(obj)) - _, err = sh.Get(context.Background(), getPrm) - require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired object must be deleted") -} - -func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { - t.Parallel() - - epoch := &epochState{ - Value: 100, - } - - cnr := cidtest.ID() - parentID := oidtest.ID() - splitID := objectSDK.NewSplitID() - - var objExpirationAttr objectSDK.Attribute - objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch) - objExpirationAttr.SetValue("101") - - var lockExpirationAttr objectSDK.Attribute - lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch) - lockExpirationAttr.SetValue("103") - - parent := testutil.GenerateObjectWithCID(cnr) - parent.SetID(parentID) - parent.SetPayload(nil) - parent.SetAttributes(objExpirationAttr) - - const childCount = 10 - children := make([]*objectSDK.Object, childCount) - childIDs := make([]oid.ID, childCount) - for i := range children { - children[i] = testutil.GenerateObjectWithCID(cnr) - if i != 0 { - children[i].SetPreviousID(childIDs[i-1]) - } - if i == len(children)-1 { - children[i].SetParent(parent) - } - children[i].SetSplitID(splitID) - children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)}) - childIDs[i], _ = children[i].ID() - } - - link := testutil.GenerateObjectWithCID(cnr) - link.SetParent(parent) - link.SetParentID(parentID) - link.SetSplitID(splitID) - link.SetChildren(childIDs...) - - linkID, _ := link.ID() - - sh := newCustomShard(t, false, shardOptions{ - metaOptions: []meta.Option{meta.WithEpochState(epoch)}, - additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool { - return util.NewPseudoWorkerPool() // synchronous event processing - })}, - }) - defer func() { require.NoError(t, sh.Close(context.Background())) }() - - lock := testutil.GenerateObjectWithCID(cnr) - lock.SetType(objectSDK.TypeLock) - lock.SetAttributes(lockExpirationAttr) - lockID, _ := lock.ID() - - var putPrm PutPrm - - for _, child := range children { - putPrm.SetObject(child) - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - } - - putPrm.SetObject(link) - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID)) - require.NoError(t, err) - - putPrm.SetObject(lock) - _, err = sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - var getPrm GetPrm - getPrm.SetAddress(objectCore.AddressOf(parent)) - - _, err = sh.Get(context.Background(), getPrm) - var splitInfoError *objectSDK.SplitInfoError - require.True(t, errors.As(err, &splitInfoError), "split info must be provided") - - epoch.Value = 105 - sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) - - _, err = sh.Get(context.Background(), getPrm) - require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires") -} - -func TestGCDropsObjectInhumedFromWritecache(t *testing.T) { - t.Parallel() - - t.Run("flush write-cache before inhume", func(t *testing.T) { - t.Parallel() - testGCDropsObjectInhumedFromWritecache(t, true) - }) - - t.Run("don't flush write-cache before inhume", func(t *testing.T) { - t.Parallel() - testGCDropsObjectInhumedFromWritecache(t, false) - }) -} - -func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) { - sh := newCustomShard(t, true, shardOptions{ - additionalShardOptions: []Option{WithDisabledGC()}, - wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, - }) - defer func() { require.NoError(t, sh.Close(context.Background())) }() - - obj := testutil.GenerateObjectWithSize(1024) - - var putPrm PutPrm - putPrm.SetObject(obj) - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - // writecache stores object - wcObj, err := sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj)) - require.NoError(t, err) - require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj)) - - // blobstore doesn't store object - bsRes, err := sh.blobStor.Get(context.Background(), common.GetPrm{ - Address: objectCore.AddressOf(obj), - }) - require.ErrorAs(t, err, new(*apistatus.ObjectNotFound)) - require.Nil(t, bsRes.Object) - require.Nil(t, bsRes.RawData) - - if flushbeforeInhume { - sh.writeCache.Flush(context.Background(), false, false) - } - - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(objectCore.AddressOf(obj)) - _, err = sh.Inhume(context.Background(), inhumePrm) - require.NoError(t, err) - - // writecache doesn't store object - wcObj, err = sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj)) - require.Error(t, err) - require.Nil(t, wcObj) - - if flushbeforeInhume { - // blobstore store object - bsRes, err = sh.blobStor.Get(context.Background(), common.GetPrm{ - Address: objectCore.AddressOf(obj), - }) - require.NoError(t, err) - require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(bsRes.Object)) - } else { - - // blobstore doesn't store object - bsRes, err = sh.blobStor.Get(context.Background(), common.GetPrm{ - Address: objectCore.AddressOf(obj), - }) - require.ErrorAs(t, err, new(*apistatus.ObjectNotFound)) - require.Nil(t, bsRes.Object) - require.Nil(t, bsRes.RawData) - } - - gcRes := sh.removeGarbage(context.Background()) - require.True(t, gcRes.success) - require.Equal(t, uint64(1), gcRes.deleted) -} - -func TestGCDontDeleteObjectFromWritecache(t *testing.T) { - sh := newCustomShard(t, true, shardOptions{ - additionalShardOptions: []Option{WithDisabledGC()}, - wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, - }) - defer func() { require.NoError(t, sh.Close(context.Background())) }() - - obj := testutil.GenerateObjectWithSize(1024) - - var putPrm PutPrm - putPrm.SetObject(obj) - _, err := sh.Put(context.Background(), putPrm) - require.NoError(t, err) - - // writecache stores object - wcObj, err := sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj)) - require.NoError(t, err) - require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj)) - - // blobstore doesn't store object - bsRes, err := sh.blobStor.Get(context.Background(), common.GetPrm{ - Address: objectCore.AddressOf(obj), - }) - require.ErrorAs(t, err, new(*apistatus.ObjectNotFound)) - require.Nil(t, bsRes.Object) - require.Nil(t, bsRes.RawData) - - var metaInhumePrm meta.InhumePrm - metaInhumePrm.SetAddresses(objectCore.AddressOf(obj)) - metaInhumePrm.SetLockObjectHandling() - metaInhumePrm.SetGCMark() - _, err = sh.metaBase.Inhume(context.Background(), metaInhumePrm) - require.NoError(t, err) - - // logs: WARN shard/delete.go:98 can't remove object: object must be flushed from writecache - gcRes := sh.removeGarbage(context.Background()) - require.True(t, gcRes.success) - require.Equal(t, uint64(0), gcRes.deleted) - - // writecache stores object - wcObj, err = sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj)) - require.NoError(t, err) - require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj)) -} diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 05823c62b..4d1f4e06a 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -2,22 +2,15 @@ package shard import ( "context" - "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) // storFetcher is a type to unify object fetching mechanism in `fetchObjectData` @@ -94,96 +87,12 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) } - cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { - var getPrm common.GetPrm - getPrm.Address = prm.addr - getPrm.StorageID = id - - res, err := stor.Get(ctx, getPrm) - if err != nil { - return nil, err - } - - return res.Object, nil + obj, err := s.objectstore.Get(ctx, prm.addr) + if err != nil { + return GetRes{}, err } - - wc := func(c writecache.Cache) (*objectSDK.Object, error) { - return c.Get(ctx, prm.addr) - } - - skipMeta := prm.skipMeta || s.info.Mode.NoMetabase() - obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc) - return GetRes{ obj: obj, - hasMeta: hasMeta, - }, err -} - -// emptyStorageID is an empty storageID that indicates that -// an object is big (and is stored in an FSTree, not in a blobovnicza). -var emptyStorageID = make([]byte, 0) - -// fetchObjectData looks through writeCache and blobStor to find object. -func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) { - var ( - mErr error - mRes meta.ExistsRes - ) - - if !skipMeta { - var mPrm meta.ExistsPrm - mPrm.SetAddress(addr) - mRes, mErr = s.metaBase.Exists(ctx, mPrm) - if mErr != nil && !s.info.Mode.NoMetabase() { - return nil, false, mErr - } - - if !mRes.Exists() { - return nil, false, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - } else { - s.log.Warn(ctx, logs.ShardFetchingObjectWithoutMeta, zap.Stringer("addr", addr)) - } - - if s.hasWriteCache() { - res, err := wc(s.writeCache) - if err == nil || IsErrOutOfRange(err) { - return res, false, err - } - if client.IsErrObjectNotFound(err) { - s.log.Debug(ctx, logs.ShardObjectIsMissingInWritecache, - zap.Stringer("addr", addr), - zap.Bool("skip_meta", skipMeta)) - } else { - s.log.Error(ctx, logs.ShardFailedToFetchObjectFromWritecache, - zap.Error(err), - zap.Stringer("addr", addr), - zap.Bool("skip_meta", skipMeta)) - } - } - if skipMeta || mErr != nil { - res, err := cb(s.blobStor, nil) - return res, false, err - } - - var mPrm meta.StorageIDPrm - mPrm.SetAddress(addr) - - mExRes, err := s.metaBase.StorageID(ctx, mPrm) - if err != nil { - return nil, true, fmt.Errorf("fetch blobovnicza id from metabase: %w", err) - } - - storageID := mExRes.StorageID() - if storageID == nil { - // `nil` storageID returned without any error - // means that object is big, `cb` expects an - // empty (but non-nil) storageID in such cases - storageID = emptyStorageID - } - - res, err := cb(s.blobStor, storageID) - - return res, true, err + hasMeta: true, + }, nil } diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index ff57e3bf9..c2ab90e6a 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -3,7 +3,7 @@ package shard import ( "context" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -62,30 +62,18 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) { var obj *objectSDK.Object var err error - mode := s.GetMode() - if mode.NoMetabase() || (mode.ReadOnly() && prm.ShardLooksBad) { - var getPrm GetPrm - getPrm.SetAddress(prm.addr) - getPrm.SetIgnoreMeta(true) - var res GetRes - res, err = s.Get(ctx, getPrm) - obj = res.Object() - } else { - s.m.RLock() - defer s.m.RUnlock() - if s.info.EvacuationInProgress { - return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - var headParams meta.GetPrm - headParams.SetAddress(prm.addr) - headParams.SetRaw(prm.raw) - - var res meta.GetRes - res, err = s.metaBase.Get(ctx, headParams) - obj = res.Header() + s.m.RLock() + defer s.m.RUnlock() + if s.info.EvacuationInProgress { + return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + headParams := objectstore.HeadPrm{ + Address: prm.addr, + Raw: prm.raw, } + obj, err = s.objectstore.Head(ctx, headParams) return HeadRes{ obj: obj, }, err diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index 26492cf01..f90ff422a 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "github.com/mr-tron/base58" "go.uber.org/zap" ) @@ -35,7 +34,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) { var idFromMetabase []byte modeDegraded := s.GetMode().NoMetabase() if !modeDegraded { - if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil { + if idFromMetabase, err = s.objectstore.GetShardID(ctx); err != nil { err = fmt.Errorf("read shard id from metabase: %w", err) } } @@ -46,24 +45,16 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) { shardID := s.info.ID.String() s.cfg.metricsWriter.SetShardID(shardID) - if s.writeCache != nil && s.writeCache.GetMetrics() != nil { - s.writeCache.GetMetrics().SetShardID(shardID) - } s.log = s.log.With(zap.Stringer("shard_id", s.info.ID)) - s.metaBase.SetLogger(s.log) - s.blobStor.SetLogger(s.log) - if s.hasWriteCache() { - s.writeCache.SetLogger(s.log) - } - s.metaBase.SetParentID(s.info.ID.String()) - s.blobStor.SetParentID(s.info.ID.String()) + s.objectstore.SetLogger(s.log) + s.objectstore.SetParentID(s.info.ID.String()) if s.pilorama != nil { s.pilorama.SetParentID(s.info.ID.String()) } if len(idFromMetabase) == 0 && !modeDegraded { - if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { + if setErr := s.objectstore.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { err = errors.Join(err, fmt.Errorf("write shard id to metabase: %w", setErr)) } } diff --git a/pkg/local_object_storage/shard/info.go b/pkg/local_object_storage/shard/info.go index f01796ec7..517c714c2 100644 --- a/pkg/local_object_storage/shard/info.go +++ b/pkg/local_object_storage/shard/info.go @@ -1,11 +1,9 @@ package shard import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" ) // Info groups the information about Shard. @@ -19,20 +17,13 @@ type Info struct { // True when evacuation is in progress. EvacuationInProgress bool - // Information about the metabase. - MetaBaseInfo meta.Info - - // Information about the BLOB storage. - BlobStorInfo blobstor.Info - - // Information about the Write Cache. - WriteCacheInfo writecache.Info - // ErrorCount contains amount of errors occurred in shard operations. ErrorCount uint32 // PiloramaInfo contains information about trees stored on this shard. PiloramaInfo pilorama.Info + + ObjectStoreInfo objectstore.Info } // DumpInfo returns information about the Shard. diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 9d5f66063..948a8a2c9 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" @@ -81,13 +82,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { return InhumeRes{}, ErrDegradedMode } - if s.hasWriteCache() { - for i := range prm.target { - _ = s.writeCache.Delete(ctx, prm.target[i]) - } - } - - var metaPrm meta.InhumePrm + var metaPrm objectstore.InhumePrm metaPrm.SetAddresses(prm.target...) metaPrm.SetLockObjectHandling() @@ -101,7 +96,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { metaPrm.SetForceGCMark() } - res, err := s.metaBase.Inhume(ctx, metaPrm) + res, err := s.objectstore.Inhume(ctx, metaPrm) if err != nil { if errors.Is(err, meta.ErrLockObjectRemoval) { s.m.RUnlock() diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 7bc5ead1d..58b29bd3b 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -60,13 +61,13 @@ type CountAliveObjectsInContainerPrm struct { // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 - cursor *Cursor + cursor *objectstore.Cursor } // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { addrList []objectcore.Info - cursor *Cursor + cursor *objectstore.Cursor } // WithCount sets maximum amount of addresses that ListWithCursor should return. @@ -77,7 +78,7 @@ func (p *ListWithCursorPrm) WithCount(count uint32) { // WithCursor sets cursor for ListWithCursor operation. For initial request, // ignore this param or use nil value. For consecutive requests, use value // from ListWithCursorRes. -func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { +func (p *ListWithCursorPrm) WithCursor(cursor *objectstore.Cursor) { p.cursor = cursor } @@ -87,7 +88,7 @@ func (r ListWithCursorRes) AddressList() []objectcore.Info { } // Cursor returns cursor for consecutive listing requests. -func (r ListWithCursorRes) Cursor() *Cursor { +func (r ListWithCursorRes) Cursor() *objectstore.Cursor { return r.cursor } @@ -106,7 +107,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) { return SelectRes{}, ErrDegradedMode } - lst, err := s.metaBase.Containers(ctx) + lst, err := s.objectstore.Containers(ctx) if err != nil { return res, fmt.Errorf("list stored containers: %w", err) } @@ -115,11 +116,12 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) { filters.AddPhyFilter() for i := range lst { - var sPrm meta.SelectPrm - sPrm.SetContainerID(lst[i]) - sPrm.SetFilters(filters) + sPrm := objectstore.SelectPrm{ + Container: lst[i], + Filters: filters, + } - sRes, err := s.metaBase.Select(ctx, sPrm) // consider making List in metabase + sRes, err := s.objectstore.Select(ctx, sPrm) // consider making List in metabase if err != nil { s.log.Debug(ctx, logs.ShardCantSelectAllObjects, zap.Stringer("cid", lst[i]), @@ -128,7 +130,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) { continue } - res.addrList = append(res.addrList, sRes.AddressList()...) + res.addrList = append(res.addrList, sRes.AddressList...) } return res, nil @@ -145,7 +147,7 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo return ListContainersRes{}, ErrDegradedMode } - containers, err := s.metaBase.Containers(ctx) + containers, err := s.objectstore.Containers(ctx) if err != nil { return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err) } @@ -173,17 +175,18 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List return ListWithCursorRes{}, ErrDegradedMode } - var metaPrm meta.ListPrm - metaPrm.SetCount(prm.count) - metaPrm.SetCursor(prm.cursor) - res, err := s.metaBase.ListWithCursor(ctx, metaPrm) + metaPrm := objectstore.ListPrm{ + Count: int(prm.count), + Cursor: prm.cursor, + } + res, err := s.objectstore.ListWithCursor(ctx, metaPrm) if err != nil { return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err) } return ListWithCursorRes{ - addrList: res.AddressList(), - cursor: res.Cursor(), + addrList: res.AddressList, + cursor: res.Cursor, }, nil } @@ -202,9 +205,7 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai return ErrDegradedMode } - var metaPrm meta.IterateOverContainersPrm - metaPrm.Handler = prm.Handler - err := s.metaBase.IterateOverContainers(ctx, metaPrm) + err := s.objectstore.IterateOverContainers(ctx, prm.Handler) if err != nil { return fmt.Errorf("iterate over containers: %w", err) } @@ -227,11 +228,11 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv return ErrDegradedMode } - var metaPrm meta.IterateOverObjectsInContainerPrm + var metaPrm objectstore.IterateOverObjectsInContainerPrm metaPrm.ContainerID = prm.ContainerID metaPrm.ObjectType = prm.ObjectType metaPrm.Handler = prm.Handler - err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) + err := s.objectstore.IterateOverObjectsInContainer(ctx, metaPrm) if err != nil { return fmt.Errorf("iterate over objects: %w", err) } @@ -251,10 +252,10 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive return 0, ErrDegradedMode } - var metaPrm meta.CountAliveObjectsInContainerPrm + var metaPrm objectstore.CountAliveObjectsInContainerPrm metaPrm.ObjectType = prm.ObjectType metaPrm.ContainerID = prm.ContainerID - count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm) + count, err := s.objectstore.CountAliveObjectsInContainer(ctx, metaPrm) if err != nil { return 0, fmt.Errorf("count alive objects in bucket: %w", err) } diff --git a/pkg/local_object_storage/shard/lock.go b/pkg/local_object_storage/shard/lock.go index 31ca16aa1..aab6616c6 100644 --- a/pkg/local_object_storage/shard/lock.go +++ b/pkg/local_object_storage/shard/lock.go @@ -2,9 +2,7 @@ package shard import ( "context" - "fmt" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -38,12 +36,7 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked [] return ErrDegradedMode } - err := s.metaBase.Lock(ctx, idCnr, locker, locked) - if err != nil { - return fmt.Errorf("metabase lock: %w", err) - } - - return nil + return s.objectstore.Lock(ctx, idCnr, locker, locked) } // IsLocked checks object locking relation of the provided object. Not found object is @@ -61,15 +54,7 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) { return false, ErrDegradedMode } - var prm meta.IsLockedPrm - prm.SetAddress(addr) - - res, err := s.metaBase.IsLocked(ctx, prm) - if err != nil { - return false, err - } - - return res.Locked(), nil + return s.objectstore.IsLocked(ctx, addr) } // GetLocks return lock id's of the provided object. Not found object is @@ -86,5 +71,5 @@ func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error if m.NoMetabase() { return nil, ErrDegradedMode } - return s.metaBase.GetLocks(ctx, addr) + return s.objectstore.GetLocks(ctx, addr) } diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 5230dcad0..b468c26f1 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -265,7 +265,7 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) - cc, err := sh.metaBase.ContainerCounters(context.Background()) + cc, err := sh.objectstore.ContainerCounters(context.Background()) require.NoError(t, err) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) @@ -300,7 +300,7 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) - cc, err := sh.metaBase.ContainerCounters(context.Background()) + cc, err := sh.objectstore.ContainerCounters(context.Background()) require.NoError(t, err) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) @@ -345,7 +345,7 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) - cc, err = sh.metaBase.ContainerCounters(context.Background()) + cc, err = sh.objectstore.ContainerCounters(context.Background()) require.NoError(t, err) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) @@ -386,7 +386,7 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize()) - cc, err = sh.metaBase.ContainerCounters(context.Background()) + cc, err = sh.objectstore.ContainerCounters(context.Background()) require.NoError(t, err) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) }) diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 901528976..f1aa5c46d 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -32,37 +32,12 @@ func (s *Shard) setMode(ctx context.Context, m mode.Mode) error { zap.Stringer("old_mode", s.info.Mode), zap.Stringer("new_mode", m)) - components := []interface { - SetMode(context.Context, mode.Mode) error - }{ - s.metaBase, s.blobStor, - } - - if s.hasWriteCache() { - components = append(components, s.writeCache) - } - - if s.pilorama != nil { - components = append(components, s.pilorama) - } - - // The usual flow of the requests (pilorama is independent): - // writecache -> blobstor -> metabase - // For mode.ReadOnly and mode.Degraded the order is: - // writecache -> blobstor -> metabase - // For mode.ReadWrite it is the opposite: - // metabase -> blobstor -> writecache - if m != mode.ReadWrite { - if s.hasWriteCache() { - components[0], components[2] = components[2], components[0] - } else { - components[0], components[1] = components[1], components[0] - } - } - if !m.Disabled() { - for i := range components { - if err := components[i].SetMode(ctx, m); err != nil { + if err := s.objectstore.SetMode(ctx, m); err != nil { + return err + } + if s.pilorama != nil { + if err := s.pilorama.SetMode(ctx, m); err != nil { return err } } diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 3f23111af..ed515a5e1 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -2,17 +2,12 @@ package shard import ( "context" - "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) // PutPrm groups the parameters of Put operation. @@ -55,54 +50,5 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) { return PutRes{}, ErrReadOnlyMode } - data, err := prm.obj.Marshal() - if err != nil { - return PutRes{}, fmt.Errorf("cannot marshal object: %w", err) - } - - var putPrm common.PutPrm // form Put parameters - putPrm.Object = prm.obj - putPrm.RawData = data - putPrm.Address = objectCore.AddressOf(prm.obj) - - var res common.PutRes - - // exist check are not performed there, these checks should be executed - // ahead of `Put` by storage engine - tryCache := s.hasWriteCache() && !m.NoMetabase() - if tryCache { - res, err = s.writeCache.Put(ctx, putPrm) - } - if err != nil || !tryCache { - if err != nil { - s.log.Debug(ctx, logs.ShardCantPutObjectToTheWritecacheTryingBlobstor, - zap.Error(err)) - } - - res, err = s.blobStor.Put(ctx, putPrm) - if err != nil { - return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err) - } - } - - if !m.NoMetabase() { - var pPrm meta.PutPrm - pPrm.SetObject(prm.obj) - pPrm.SetStorageID(res.StorageID) - pPrm.SetIndexAttributes(prm.indexAttributes) - res, err := s.metaBase.Put(ctx, pPrm) - if err != nil { - // may we need to handle this case in a special way - // since the object has been successfully written to BlobStor - return PutRes{}, fmt.Errorf("put object to metabase: %w", err) - } - - if res.Inserted { - s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj)) - s.addToPayloadSize(int64(prm.obj.PayloadSize())) - s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) - } - } - - return PutRes{}, nil + return PutRes{}, s.objectstore.Put(ctx, prm.obj) } diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index 701268820..490210445 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -4,10 +4,7 @@ import ( "context" "strconv" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -95,47 +92,12 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { return RngRes{}, ErrShardDisabled } - cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) { - var getRngPrm common.GetRangePrm - getRngPrm.Address = prm.addr - getRngPrm.Range.SetOffset(prm.off) - getRngPrm.Range.SetLength(prm.ln) - getRngPrm.StorageID = id - - res, err := stor.GetRange(ctx, getRngPrm) - if err != nil { - return nil, err - } - - obj := objectSDK.New() - obj.SetPayload(res.Data) - - return obj, nil + obj, err := s.objectstore.GetRange(ctx, prm.addr, prm.off, prm.ln) + if err != nil { + return RngRes{}, err } - - wc := func(c writecache.Cache) (*objectSDK.Object, error) { - res, err := c.Get(ctx, prm.addr) - if err != nil { - return nil, err - } - - payload := res.Payload() - from := prm.off - to := from + prm.ln - if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { - return nil, logicerr.Wrap(new(apistatus.ObjectOutOfRange)) - } - - obj := objectSDK.New() - obj.SetPayload(payload[from:to]) - return obj, nil - } - - skipMeta := prm.skipMeta || s.info.Mode.NoMetabase() - obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc) - return RngRes{ obj: obj, - hasMeta: hasMeta, - }, err + hasMeta: true, + }, nil } diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index 0593f5894..04b7c6402 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -7,8 +7,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" @@ -69,7 +69,7 @@ func newRebuilder() *rebuilder { } } -func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) { +func (r *rebuilder) Start(ctx context.Context, os *objectstore.ObjectStore, log *logger.Logger) { r.mtx.Lock() defer r.mtx.Unlock() @@ -90,13 +90,13 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D if !ok { continue } - runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter) + runRebuild(ctx, os, log, t.fillPercent, t.limiter) } } }() } -func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger, +func runRebuild(ctx context.Context, os *objectstore.ObjectStore, log *logger.Logger, fillPercent int, limiter RebuildWorkerLimiter, ) { select { @@ -106,7 +106,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo } log.Info(ctx, logs.BlobstoreRebuildStarted) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) - if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil { + if err := os.Rebuild(ctx); err != nil { log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err)) } else { log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully) diff --git a/pkg/local_object_storage/shard/refill_test.go b/pkg/local_object_storage/shard/refill_test.go deleted file mode 100644 index d90343265..000000000 --- a/pkg/local_object_storage/shard/refill_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package shard - -import ( - "context" - "os" - "testing" - - shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" - "github.com/stretchr/testify/require" -) - -func BenchmarkRefillMetabase(b *testing.B) { - b.Run("100 objects", func(b *testing.B) { - benchRefillMetabase(b, 100) - }) - - b.Run("1000 objects", func(b *testing.B) { - benchRefillMetabase(b, 1000) - }) - - b.Run("2000 objects", func(b *testing.B) { - benchRefillMetabase(b, 2000) - }) - - b.Run("5000 objects", func(b *testing.B) { - benchRefillMetabase(b, 5000) - }) -} - -func benchRefillMetabase(b *testing.B, objectsCount int) { - sh := newCustomShard(b, false, shardOptions{ - additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)}, - }) - - defer func() { require.NoError(b, sh.Close(context.Background())) }() - - var putPrm PutPrm - - for range objectsCount / 2 { - obj := testutil.GenerateObject() - testutil.AddAttribute(obj, "foo", "bar") - testutil.AddPayload(obj, 1<<5) // blobvnicza tree obj - - putPrm.SetObject(obj) - - _, err := sh.Put(context.Background(), putPrm) - require.NoError(b, err) - } - - for range objectsCount / 2 { - obj := testutil.GenerateObject() - testutil.AddAttribute(obj, "foo", "bar") - obj.SetID(oidtest.ID()) - testutil.AddPayload(obj, 1<<20) // fstree obj - - putPrm.SetObject(obj) - - _, err := sh.Put(context.Background(), putPrm) - require.NoError(b, err) - } - - require.NoError(b, sh.Close(context.Background())) - require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path)) - - require.NoError(b, sh.Open(context.Background())) - sh.cfg.refillMetabase = true - - b.ReportAllocs() - b.ResetTimer() - - require.NoError(b, sh.Init(context.Background())) - - require.NoError(b, sh.Close(context.Background())) -} diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index e563f390b..ad27c3c9c 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -90,7 +90,7 @@ func TestShardReload(t *testing.T) { t.Run("open meta at new path", func(t *testing.T) { newShardOpts := func(metaPath string, resync bool) []Option { metaOpts := []meta.Option{meta.WithPath(metaPath), meta.WithEpochState(epochState{})} - return append(opts, WithMetaBaseOptions(metaOpts...), WithRefillMetabase(resync)) + return append(opts, WithMetaBaseOptions(metaOpts...)) } newOpts := newShardOpts(filepath.Join(p, "meta1"), false) diff --git a/pkg/local_object_storage/shard/select.go b/pkg/local_object_storage/shard/select.go index c7c7e11c2..f864c1e5e 100644 --- a/pkg/local_object_storage/shard/select.go +++ b/pkg/local_object_storage/shard/select.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -60,17 +60,18 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) { return SelectRes{}, ErrDegradedMode } - var selectPrm meta.SelectPrm - selectPrm.SetFilters(prm.filters) - selectPrm.SetContainerID(prm.cnr) - selectPrm.SetUseAttributeIndex(prm.isIndexedContainer) + selectPrm := objectstore.SelectPrm{ + Filters: prm.filters, + Container: prm.cnr, + UseAttributeIndex: prm.isIndexedContainer, + } - mRes, err := s.metaBase.Select(ctx, selectPrm) + mRes, err := s.objectstore.Select(ctx, selectPrm) if err != nil { return SelectRes{}, fmt.Errorf("select objects from metabase: %w", err) } return SelectRes{ - addrList: mRes.AddressList(), + addrList: mRes.AddressList, }, nil } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index ca19cee3d..cf721a9df 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -27,13 +27,9 @@ type Shard struct { gc *gc - writeCache writecache.Cache - - blobStor *blobstor.BlobStor - pilorama pilorama.ForestStorage - metaBase *meta.DB + objectstore *objectstore.ObjectStore tsSource TombstoneSource @@ -48,7 +44,7 @@ type Shard struct { type Option func(*cfg) // ExpiredTombstonesCallback is a callback handling list of expired tombstones. -type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) +type ExpiredTombstonesCallback func(context.Context, []objectstore.TombstonedObject) // ExpiredObjectsCallback is a callback handling list of expired objects. type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) @@ -62,9 +58,6 @@ type EmptyContainersCallback func(context.Context, []cid.ID) type cfg struct { m sync.RWMutex - refillMetabase bool - refillMetabaseWorkersCount int - rmBatchSize int useWriteCache bool @@ -123,29 +116,12 @@ func New(opts ...Option) *Shard { opts[i](c) } - bs := blobstor.New(c.blobOpts...) - mb := meta.New(c.metaOpts...) + os := objectstore.New(c.objectStoreOpts...) s := &Shard{ - cfg: c, - blobStor: bs, - metaBase: mb, - tsSource: c.tsSource, - } - - reportFunc := func(ctx context.Context, msg string, err error) { - s.reportErrorFunc(ctx, s.ID().String(), msg, err) - } - - s.blobStor.SetReportErrorFunc(reportFunc) - - if c.useWriteCache { - s.writeCache = writecache.New( - append(c.writeCacheOpts, - writecache.WithReportErrorFunc(reportFunc), - writecache.WithBlobstor(bs), - writecache.WithMetabase(mb))...) - s.writeCache.GetMetrics().SetPath(s.writeCache.DumpInfo().Path) + cfg: c, + tsSource: c.tsSource, + objectstore: os, } if s.piloramaOpts != nil { @@ -153,7 +129,6 @@ func New(opts ...Option) *Shard { } s.fillInfo() - s.writecacheSealCancel.Store(notInitializedCancel) return s } @@ -226,11 +201,6 @@ func (s *Shard) hasWriteCache() bool { return s.cfg.useWriteCache } -// NeedRefillMetabase returns true if metabase is needed to be refilled. -func (s *Shard) NeedRefillMetabase() bool { - return s.cfg.refillMetabase -} - // WithRemoverBatchSize returns option to set batch size // of single removal operation. func WithRemoverBatchSize(sz int) Option { @@ -271,20 +241,6 @@ func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option { } } -// WithRefillMetabase returns option to set flag to refill the Metabase on Shard's initialization step. -func WithRefillMetabase(v bool) Option { - return func(c *cfg) { - c.refillMetabase = v - } -} - -// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step. -func WithRefillMetabaseWorkersCount(v int) Option { - return func(c *cfg) { - c.refillMetabaseWorkersCount = v - } -} - // WithMode returns option to set shard's mode. Mode must be one of the predefined: // - mode.ReadWrite; // - mode.ReadOnly. @@ -378,13 +334,8 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option { } func (s *Shard) fillInfo() { - s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() - s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.Mode = s.GetMode() - - if s.cfg.useWriteCache { - s.cfg.info.WriteCacheInfo = s.writeCache.DumpInfo() - } + s.cfg.info.ObjectStoreInfo = s.objectstore.Info() if s.pilorama != nil { s.cfg.info.PiloramaInfo = s.pilorama.DumpInfo() } @@ -408,7 +359,7 @@ func (s *Shard) updateMetrics(ctx context.Context) { return } - cc, err := s.metaBase.ObjectCounters() + cc, err := s.objectstore.ObjectCounters(ctx) if err != nil { s.log.Warn(ctx, logs.ShardMetaObjectCounterRead, zap.Error(err), @@ -421,7 +372,7 @@ func (s *Shard) updateMetrics(ctx context.Context) { s.setObjectCounterBy(logical, cc.Logic) s.setObjectCounterBy(user, cc.User) - cnrList, err := s.metaBase.Containers(ctx) + cnrList, err := s.objectstore.Containers(ctx) if err != nil { s.log.Warn(ctx, logs.ShardMetaCantReadContainerList, zap.Error(err)) return @@ -430,7 +381,7 @@ func (s *Shard) updateMetrics(ctx context.Context) { var totalPayload uint64 for i := range cnrList { - size, err := s.metaBase.ContainerSize(cnrList[i]) + size, err := s.objectstore.ContainerSize(ctx, cnrList[i]) if err != nil { s.log.Warn(ctx, logs.ShardMetaCantReadContainerSize, zap.String("cid", cnrList[i].EncodeToString()), @@ -443,7 +394,7 @@ func (s *Shard) updateMetrics(ctx context.Context) { s.addToPayloadSize(int64(totalPayload)) - contCount, err := s.metaBase.ContainerCounters(ctx) + contCount, err := s.objectstore.ContainerCounters(ctx) if err != nil { s.log.Warn(ctx, logs.FailedToGetContainerCounters, zap.Error(err)) return @@ -481,7 +432,7 @@ func (s *Shard) setObjectCounterBy(typ string, v uint64) { } } -func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) { +func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]objectstore.ObjectCounters) { for cnrID, count := range byCnr { if count.Phy > 0 { s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy) diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go index f655e477a..cd278653b 100644 --- a/pkg/local_object_storage/shard/writecache.go +++ b/pkg/local_object_storage/shard/writecache.go @@ -5,7 +5,7 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -67,7 +67,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error return ErrDegradedMode } - return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal) + return s.objectstore.Flush(ctx, p.ignoreErrors, p.seal) } type SealWriteCachePrm struct { @@ -117,7 +117,7 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { if !p.Async { defer cleanup() } - prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink} + prm := objectstore.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink} if p.Async { started := make(chan struct{}) go func() { @@ -125,7 +125,7 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { defer cleanup() s.log.Info(ctx, logs.StartedWritecacheSealAsync) - if err := s.writeCache.Seal(ctx, prm); err != nil { + if err := s.objectstore.Seal(ctx, prm); err != nil { s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err)) return } @@ -138,5 +138,5 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { return nil } } - return s.writeCache.Seal(ctx, prm) + return s.objectstore.Seal(ctx, prm) } diff --git a/pkg/services/control/server/list_shards.go b/pkg/services/control/server/list_shards.go index efe2754ea..5aa901859 100644 --- a/pkg/services/control/server/list_shards.go +++ b/pkg/services/control/server/list_shards.go @@ -31,9 +31,6 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) ( si := new(control.ShardInfo) si.SetShard_ID(*sh.ID) - si.SetMetabasePath(sh.MetaBaseInfo.Path) - si.Blobstor = blobstorInfoToProto(sh.BlobStorInfo) - si.SetWritecachePath(sh.WriteCacheInfo.Path) si.SetPiloramaPath(sh.PiloramaInfo.Path) var m control.ShardMode