[#9999] objectstore: Add stub

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-25 15:46:24 +03:00
parent f57f4b87f7
commit 2c4da8222d
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
57 changed files with 860 additions and 1686 deletions

View file

@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr)
csRes, err := sh.Shard.ContainerSize(csPrm)
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
if err != nil {
e.reportShardError(ctx, sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr))

View file

@ -77,7 +77,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
errCh := make(chan shardInitError, len(e.shards))
var eg errgroup.Group
if e.cfg.lowMem && e.anyShardRequiresRefill() {
if e.cfg.lowMem {
eg.SetLimit(1)
}
@ -131,15 +131,6 @@ func (e *StorageEngine) Init(ctx context.Context) error {
return nil
}
func (e *StorageEngine) anyShardRequiresRefill() bool {
for _, sh := range e.shards {
if sh.NeedRefillMetabase() {
return true
}
}
return false
}
var errClosed = errors.New("storage engine is closed")
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
@ -350,8 +341,8 @@ func calculateShardID(info shard.Info) string {
// This calculation should be kept in sync with node
// configuration parsing during SIGHUP.
var sb strings.Builder
for _, sub := range info.BlobStorInfo.SubStorages {
sb.WriteString(filepath.Clean(sub.Path))
for _, path := range []string{info.ObjectStoreInfo.BlobPath, info.ObjectStoreInfo.MetaPath} {
sb.WriteString(filepath.Clean(path))
}
return sb.String()
}

View file

@ -213,8 +213,8 @@ func TestPersistentShardID(t *testing.T) {
}
require.NoError(t, newTe.ng.Close(context.Background()))
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))

View file

@ -195,8 +195,8 @@ func TestBlobstorFailback(t *testing.T) {
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close(context.Background()))
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().ObjectStoreInfo.MetaPath
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))

View file

@ -5,7 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
@ -270,7 +270,7 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I
return allLocks, outErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []objectstore.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(ctx, addrs)
@ -339,7 +339,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
var drop []cid.ID
for id := range idMap {
prm.SetContainerID(id)
s, err := sh.ContainerSize(prm)
s, err := sh.ContainerSize(ctx, prm)
if err != nil {
e.log.Warn(ctx, logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true

View file

@ -6,6 +6,7 @@ import (
"sort"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
)
@ -20,14 +21,14 @@ var ErrEndOfListing = shard.ErrEndOfListing
type Cursor struct {
current string
shardIDs map[string]bool
shardIDToCursor map[string]*shard.Cursor
shardIDToCursor map[string]*objectstore.Cursor
}
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
func (c *Cursor) getCurrentShardCursor() *objectstore.Cursor {
return c.shardIDToCursor[c.current]
}
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
func (c *Cursor) setCurrentShardCursor(sc *objectstore.Cursor) {
c.shardIDToCursor[c.current] = sc
}
@ -177,7 +178,7 @@ func getSortedShardIDs(e *StorageEngine) []string {
func newCursor(shardIDs []string) *Cursor {
shardIDsMap := make(map[string]bool)
shardIDToCursor := make(map[string]*shard.Cursor)
shardIDToCursor := make(map[string]*objectstore.Cursor)
for _, shardID := range shardIDs {
shardIDsMap[shardID] = false
}

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
@ -64,7 +65,7 @@ func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicat
errG.Go(func() error {
defer close(ch)
var cursor *meta.Cursor
var cursor *objectstore.Cursor
for {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(uint32(prm.Concurrency))

View file

@ -140,7 +140,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
)...)
if err := sh.UpdateID(ctx); err != nil {
e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.Error(err))
}
return sh, nil

View file

@ -0,0 +1,11 @@
package objectstore
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (s *ObjectStore) GetChildren(ctx context.Context, addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,39 @@
package objectstore
import (
"context"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
func (s *ObjectStore) Containers(ctx context.Context) (list []cid.ID, err error) {
panic("unimplemented")
}
func (s *ObjectStore) ContainerSize(ctx context.Context, id cid.ID) (uint64, error) {
panic("unimplemented")
}
type ContainerCounters struct {
Counts map[cid.ID]ObjectCounters
}
func (s *ObjectStore) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
panic("unimplemented")
}
func (s *ObjectStore) DeleteContainerSize(ctx context.Context, id cid.ID) error {
panic("unimplemented")
}
func (s *ObjectStore) DeleteContainerCount(ctx context.Context, id cid.ID) error {
panic("unimplemented")
}
func (s *ObjectStore) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
panic("unimplemented")
}
func (s *ObjectStore) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,19 @@
package objectstore
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
func (s *ObjectStore) Open(ctx context.Context, m mode.Mode) error {
panic("unimplmented")
}
func (s *ObjectStore) Init(ctx context.Context) error {
panic("unimplmented")
}
func (s *ObjectStore) Close(ctx context.Context) error {
panic("unimplemented")
}

View file

@ -0,0 +1,21 @@
package objectstore
import (
"context"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
type ObjectCounters struct {
Logic uint64
Phy uint64
User uint64
}
func (s *ObjectStore) ObjectCounters(ctx context.Context) (ObjectCounters, error) {
panic("unimplemented")
}
func (s *ObjectStore) ContainerCount(ctx context.Context, containerID cid.ID) (ObjectCounters, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,51 @@
package objectstore
import (
"context"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type DeleteRes struct {
phyCount uint64
logicCount uint64
userCount uint64
phySize uint64
logicSize uint64
removedByCnrID map[cid.ID]ObjectCounters
}
// LogicCount returns the number of removed logic
// objects.
func (d DeleteRes) LogicCount() uint64 {
return d.logicCount
}
func (d DeleteRes) UserCount() uint64 {
return d.userCount
}
// RemovedByCnrID returns the number of removed objects by container ID.
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID
}
// PhyCount returns the number of removed physical objects.
func (d DeleteRes) PhyCount() uint64 {
return d.phyCount
}
// PhySize returns the size of removed physical objects.
func (d DeleteRes) PhySize() uint64 {
return d.phySize
}
// LogicSize returns the size of removed logical objects.
func (d DeleteRes) LogicSize() uint64 {
return d.logicSize
}
func (s *ObjectStore) Delete(ctx context.Context, address oid.Address) (DeleteRes, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,22 @@
package objectstore
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type ExistsPrm struct {
// Exists option to set object checked for existence.
Address oid.Address
// Exists option to set parent object checked for existence.
ECParentAddress oid.Address
}
type ExistsRes struct {
Exists, Locked bool
}
func (s *ObjectStore) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,17 @@
package objectstore
import "context"
func (s *ObjectStore) Flush(ctx context.Context, ignoreErrors, seal bool) error {
panic("unimplemented")
}
type SealPrm struct {
IgnoreErrors bool
RestoreMode bool
Shrink bool
}
func (s *ObjectStore) Seal(context.Context, SealPrm) error {
panic("unimplemented")
}

View file

@ -0,0 +1,119 @@
package objectstore
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type GarbageObject struct {
addr oid.Address
}
// Address returns garbage object address.
func (g GarbageObject) Address() oid.Address {
return g.addr
}
// GarbageHandler is a GarbageObject handling function.
type GarbageHandler func(GarbageObject) error
type GarbageIterationPrm struct {
h GarbageHandler
}
// SetHandler sets a handler that will be called on every
// GarbageObject.
func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) {
g.h = h
}
func (s *ObjectStore) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) error {
panic("unimplemented")
}
type TombstonedObject struct {
addr oid.Address
tomb oid.Address
}
// Address returns tombstoned object address.
func (g TombstonedObject) Address() oid.Address {
return g.addr
}
// Tombstone returns address of a tombstone that
// covers object.
func (g TombstonedObject) Tombstone() oid.Address {
return g.tomb
}
// TombstonedHandler is a TombstonedObject handling function.
type TombstonedHandler func(object TombstonedObject) error
// GraveyardIterationPrm groups parameters of the graveyard
// iteration process.
type GraveyardIterationPrm struct {
h TombstonedHandler
}
// SetHandler sets a handler that will be called on every
// TombstonedObject.
func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) {
g.h = h
}
// IterateOverGraveyard iterates over all graves in DB.
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (s *ObjectStore) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm) error {
panic("unimplemented")
}
type ExpiredObject struct {
typ objectSDK.Type
addr oid.Address
}
// Type returns type of the expired object.
func (e *ExpiredObject) Type() objectSDK.Type {
return e.typ
}
// Address returns address of the expired object.
func (e *ExpiredObject) Address() oid.Address {
return e.addr
}
// ExpiredObjectHandler is an ExpiredObject handling function.
type ExpiredObjectHandler func(*ExpiredObject) error
// ErrInterruptIterator is returned by iteration handlers
// as a "break" keyword.
var ErrInterruptIterator = logicerr.New("iterator is interrupted")
// IterateExpired iterates over all objects in DB which are out of date
// relative to epoch. Locked objects are not included (do not confuse
// with objects of type LOCK).
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (s *ObjectStore) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error {
panic("unimplemented")
}
func (s *ObjectStore) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) {
panic("unimplemented")
}
// InhumeTombstones deletes tombstoned objects from the
// graveyard bucket.
//
// Returns any error appeared during deletion process.
func (s *ObjectStore) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,12 @@
package objectstore
import (
"context"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (s *ObjectStore) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,12 @@
package objectstore
import (
"context"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (s *ObjectStore) GetRange(ctx context.Context, address oid.Address, offset, limit uint64) (*objectSDK.Object, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,17 @@
package objectstore
import (
"context"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type HeadPrm struct {
Address oid.Address
Raw bool
}
func (s *ObjectStore) Head(ctx context.Context, prm HeadPrm) (*objectSDK.Object, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,15 @@
package objectstore
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
func (s *ObjectStore) GetShardID(ctx context.Context) ([]byte, error) {
panic("unimplemented")
}
func (s *ObjectStore) SetShardID(ctx context.Context, id []byte, mode mode.Mode) error {
panic("unimplemented")
}

View file

@ -0,0 +1,7 @@
package objectstore
type Info struct {
WALPath string
BlobPath string
MetaPath string
}

View file

@ -0,0 +1,152 @@
package objectstore
import (
"context"
"fmt"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// InhumePrm encapsulates parameters for Inhume operation.
type InhumePrm struct {
tomb *oid.Address
target []oid.Address
lockObjectHandling bool
forceRemoval bool
}
// DeletionInfo contains details on deleted object.
type DeletionInfo struct {
Size uint64
CID cid.ID
IsUser bool
}
// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
deletedLockObj []oid.Address
logicInhumed uint64
userInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo
}
// LogicInhumed return number of logic object
// that have been inhumed.
func (i InhumeRes) LogicInhumed() uint64 {
return i.logicInhumed
}
func (i InhumeRes) UserInhumed() uint64 {
return i.userInhumed
}
// InhumedByCnrID return number of object
// that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
return i.inhumedByCnrID
}
// DeletedLockObjects returns deleted object of LOCK
// type. Returns always nil if WithoutLockObjectHandling
// was provided to the InhumePrm.
func (i InhumeRes) DeletedLockObjects() []oid.Address {
return i.deletedLockObj
}
// GetDeletionInfoLength returns amount of stored elements
// in deleted sizes array.
func (i InhumeRes) GetDeletionInfoLength() int {
return len(i.deletionDetails)
}
// GetDeletionInfoByIndex returns both deleted object sizes and
// associated container ID by index.
func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo {
return i.deletionDetails[target]
}
// StoreDeletionInfo stores size of deleted object and associated container ID
// in corresponding arrays.
func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) {
i.deletionDetails = append(i.deletionDetails, DeletionInfo{
Size: deletedSize,
CID: containerID,
IsUser: isUser,
})
i.logicInhumed++
if isUser {
i.userInhumed++
}
if v, ok := i.inhumedByCnrID[containerID]; ok {
v.Logic++
if isUser {
v.User++
}
i.inhumedByCnrID[containerID] = v
} else {
v = ObjectCounters{
Logic: 1,
}
if isUser {
v.User = 1
}
i.inhumedByCnrID[containerID] = v
}
}
// SetAddresses sets a list of object addresses that should be inhumed.
func (p *InhumePrm) SetAddresses(addrs ...oid.Address) {
p.target = addrs
}
// SetTombstoneAddress sets tombstone address as the reason for inhume operation.
//
// addr should not be nil.
// Should not be called along with SetGCMark.
func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) {
p.tomb = &addr
}
// SetGCMark marks the object to be physically removed.
//
// Should not be called along with SetTombstoneAddress.
func (p *InhumePrm) SetGCMark() {
p.tomb = nil
}
// SetLockObjectHandling checks if there were
// any LOCK object among the targets set via WithAddresses.
func (p *InhumePrm) SetLockObjectHandling() {
p.lockObjectHandling = true
}
// SetForceGCMark allows removal any object. Expected to be
// called only in control service.
func (p *InhumePrm) SetForceGCMark() {
p.tomb = nil
p.forceRemoval = true
}
func (p *InhumePrm) validate() error {
if p == nil {
return nil
}
if p.tomb != nil {
for _, addr := range p.target {
if addr.Container() != p.tomb.Container() {
return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb)
}
}
}
return nil
}
func (s *ObjectStore) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,37 @@
package objectstore
import (
"context"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
func (s *ObjectStore) IterateOverContainers(ctx context.Context, handler func(context.Context, objectSDK.Type, cid.ID) error) error {
panic("unimplemented")
}
type IterateOverObjectsInContainerPrm struct {
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
func (s *ObjectStore) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
panic("unimplemented")
}
type CountAliveObjectsInContainerPrm struct {
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
}
func (s *ObjectStore) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,26 @@
package objectstore
import (
"context"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
)
type Cursor struct {
bucketName []byte
inBucketOffset []byte
}
type ListPrm struct {
Count int
Cursor *Cursor
}
type ListRes struct {
AddressList []objectcore.Info
Cursor *Cursor
}
func (s *ObjectStore) ListWithCursor(context.Context, ListPrm) (ListRes, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,24 @@
package objectstore
import (
"context"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (s *ObjectStore) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
panic("unimplemented")
}
func (s *ObjectStore) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
panic("unimplemented")
}
func (s *ObjectStore) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
panic("unimplemented")
}
func (s *ObjectStore) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
panic("unimplemented")
}

View file

@ -0,0 +1,11 @@
package objectstore
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
func (s *ObjectStore) SetMode(ctx context.Context, m mode.Mode) error {
panic("unimplemented")
}

View file

@ -0,0 +1,11 @@
package objectstore
import (
"context"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
func (s *ObjectStore) Put(ctx context.Context, object *objectSDK.Object) error {
panic("unimplemented")
}

View file

@ -0,0 +1,7 @@
package objectstore
import "context"
func (s *ObjectStore) Rebuild(ctx context.Context) error {
panic("unimplemented")
}

View file

@ -0,0 +1,23 @@
package objectstore
import (
"context"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type SelectPrm struct {
Container cid.ID
Filters objectSDK.SearchFilters
UseAttributeIndex bool
}
type SelectRes struct {
AddressList []oid.Address
}
func (s *ObjectStore) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) {
panic("unimplemented")
}

View file

@ -1,11 +1,32 @@
package objectstore
type ObjectStore struct{}
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
func New(opts ...Option) (*ObjectStore, error) {
type ObjectStore struct {
cfg *config
}
func New(opts ...Option) *ObjectStore {
cfg := defaultCfg()
for _, opt := range opts {
opt(cfg)
}
return &ObjectStore{}, nil
return &ObjectStore{
cfg: cfg,
}
}
func (s *ObjectStore) Info() Info {
return Info{
WALPath: s.cfg.walPath,
BlobPath: s.cfg.blobPath,
MetaPath: s.cfg.metaPath,
}
}
func (s *ObjectStore) SetLogger(l *logger.Logger) {
s.cfg.logger = l
}
func (s *ObjectStore) SetParentID(parentID string) {
}

View file

@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 {
return r.size
}
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) {
s.m.RLock()
defer s.m.RUnlock()
@ -34,7 +34,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
return ContainerSizeRes{}, ErrDegradedMode
}
size, err := s.metaBase.ContainerSize(prm.cnr)
size, err := s.objectstore.ContainerSize(ctx, prm.cnr)
if err != nil {
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
}
@ -69,7 +69,7 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
return ContainerCountRes{}, ErrDegradedMode
}
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
counters, err := s.objectstore.ContainerCount(ctx, prm.ContainerID)
if err != nil {
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
}
@ -100,7 +100,7 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
return s.metaBase.DeleteContainerSize(ctx, id)
return s.objectstore.DeleteContainerSize(ctx, id)
}
func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
@ -122,5 +122,5 @@ func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
return s.metaBase.DeleteContainerCount(ctx, id)
return s.objectstore.DeleteContainerCount(ctx, id)
}

View file

@ -2,22 +2,12 @@ package shard
import (
"context"
"errors"
"fmt"
"slices"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error {
@ -45,59 +35,21 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err
// Open opens all Shard's components.
func (s *Shard) Open(ctx context.Context) error {
components := []interface {
Open(context.Context, mode.Mode) error
}{
s.blobStor,
}
m := s.GetMode()
if !m.NoMetabase() {
components = append(components, s.metaBase)
}
if s.hasWriteCache() && !m.NoMetabase() {
components = append(components, s.writeCache)
if err := s.objectstore.Open(ctx, m); err != nil {
return err
}
if s.pilorama != nil {
components = append(components, s.pilorama)
}
for i, component := range components {
if err := component.Open(ctx, m); err != nil {
if component == s.metaBase {
// We must first open all other components to avoid
// opening non-existent DB in read-only mode.
for j := i + 1; j < len(components); j++ {
if err := components[j].Open(ctx, m); err != nil {
// Other components must be opened, fail.
return fmt.Errorf("open %T: %w", components[j], err)
}
}
err = s.handleMetabaseFailure(ctx, "open", err)
if err != nil {
return err
}
break
}
return fmt.Errorf("open %T: %w", component, err)
if err := s.pilorama.Open(ctx, m); err != nil {
return err
}
}
return nil
}
type metabaseSynchronizer Shard
func (x *metabaseSynchronizer) Init(ctx context.Context) error {
ctx, span := tracing.StartSpanFromContext(ctx, "metabaseSynchronizer.Init")
defer span.End()
return (*Shard)(x).refillMetabase(ctx)
}
// Init initializes all Shard's components.
func (s *Shard) Init(ctx context.Context) error {
m := s.GetMode()
@ -132,234 +84,23 @@ func (s *Shard) Init(ctx context.Context) error {
s.rb = newRebuilder()
if !m.NoMetabase() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
s.rb.Start(ctx, s.objectstore, s.log)
}
s.writecacheSealCancel.Store(dummyCancel)
return nil
}
func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
type initializer interface {
Init(context.Context) error
}
var components []initializer
if !m.NoMetabase() {
var initMetabase initializer
if s.NeedRefillMetabase() {
initMetabase = (*metabaseSynchronizer)(s)
} else {
initMetabase = s.metaBase
}
components = []initializer{
s.blobStor, initMetabase,
}
} else {
components = []initializer{s.blobStor}
}
if s.hasWriteCache() && !m.NoMetabase() {
components = append(components, s.writeCache)
if err := s.objectstore.Init(ctx); err != nil {
return err
}
if s.pilorama != nil {
components = append(components, s.pilorama)
}
for _, component := range components {
if err := component.Init(ctx); err != nil {
if component == s.metaBase {
if errors.Is(err, meta.ErrOutdatedVersion) || errors.Is(err, meta.ErrIncompletedUpgrade) {
return fmt.Errorf("metabase initialization: %w", err)
}
err = s.handleMetabaseFailure(ctx, "init", err)
if err != nil {
return err
}
break
}
return fmt.Errorf("initialize %T: %w", component, err)
}
}
return nil
}
func (s *Shard) refillMetabase(ctx context.Context) error {
path := s.metaBase.DumpInfo().Path
s.metricsWriter.SetRefillStatus(path, "running")
s.metricsWriter.SetRefillPercent(path, 0)
var success bool
defer func() {
if success {
s.metricsWriter.SetRefillStatus(path, "completed")
} else {
s.metricsWriter.SetRefillStatus(path, "failed")
}
}()
err := s.metaBase.Reset()
if err != nil {
return fmt.Errorf("reset metabase: %w", err)
}
withCount := true
totalObjects, err := s.blobStor.ObjectsCount(ctx)
if err != nil {
s.log.Warn(ctx, logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
withCount = false
}
eg, egCtx := errgroup.WithContext(ctx)
if s.cfg.refillMetabaseWorkersCount > 0 {
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
}
var completedCount uint64
var metricGuard sync.Mutex
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
eg.Go(func() error {
var success bool
defer func() {
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
if withCount {
metricGuard.Lock()
completedCount++
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
metricGuard.Unlock()
}
}()
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
return err
}
success = true
return nil
})
select {
case <-egCtx.Done():
return egCtx.Err()
default:
return nil
}
})
egErr := eg.Wait()
err = errors.Join(egErr, itErr)
if err != nil {
return fmt.Errorf("put objects to the meta: %w", err)
}
err = s.metaBase.SyncCounters()
if err != nil {
return fmt.Errorf("sync object counters: %w", err)
}
success = true
s.metricsWriter.SetRefillPercent(path, 100)
return nil
}
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
s.log.Warn(ctx, logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr),
zap.Error(err))
return nil
}
hasIndexedAttribute := slices.IndexFunc(obj.Attributes(), func(attr objectSDK.Attribute) bool { return meta.IsAtrributeIndexed(attr.Key()) }) > 0
var isIndexedContainer bool
if hasIndexedAttribute {
info, err := s.containerInfo.Info(ctx, addr.Container())
if err != nil {
if err := s.pilorama.Init(ctx); err != nil {
return err
}
if info.Removed {
s.log.Debug(ctx, logs.ShardSkipObjectFromResyncContainerDeleted, zap.Stringer("address", addr))
return nil
}
isIndexedContainer = info.Indexed
}
var err error
switch obj.Type() {
case objectSDK.TypeTombstone:
err = s.refillTombstoneObject(ctx, obj)
case objectSDK.TypeLock:
err = s.refillLockObject(ctx, obj)
default:
}
if err != nil {
return err
}
var mPrm meta.PutPrm
mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor)
mPrm.SetIndexAttributes(hasIndexedAttribute && isIndexedContainer)
_, err = s.metaBase.Put(ctx, mPrm)
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err
}
return nil
}
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("unmarshal lock content: %w", err)
}
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err := s.metaBase.Lock(ctx, cnr, id, locked)
if err != nil {
return fmt.Errorf("lock objects: %w", err)
}
return nil
}
func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("unmarshal tombstone content: %w", err)
}
tombAddr := object.AddressOf(obj)
memberIDs := tombstone.Members()
tombMembers := make([]oid.Address, 0, len(memberIDs))
for i := range memberIDs {
a := tombAddr
a.SetObject(memberIDs[i])
tombMembers = append(tombMembers, a)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetAddresses(tombMembers...)
_, err := s.metaBase.Inhume(ctx, inhumePrm)
if err != nil {
return fmt.Errorf("inhume objects: %w", err)
}
return nil
}
@ -368,34 +109,28 @@ func (s *Shard) Close(ctx context.Context) error {
if s.rb != nil {
s.rb.Stop(ctx, s.log)
}
var components []interface{ Close(context.Context) error }
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.hasWriteCache() {
prev := s.writecacheSealCancel.Swap(notInitializedCancel)
prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex
components = append(components, s.writeCache)
}
components = append(components, s.blobStor, s.metaBase)
var lastErr error
for _, component := range components {
if err := component.Close(ctx); err != nil {
lastErr = err
if err := s.pilorama.Close(ctx); err != nil {
s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err))
return err
}
}
prev := s.writecacheSealCancel.Swap(notInitializedCancel)
prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex
if err := s.objectstore.Close(ctx); err != nil {
s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err))
return err
}
// If Init/Open was unsuccessful gc can be nil.
if s.gc != nil {
s.gc.stop(ctx)
}
return lastErr
return nil
}
// Reload reloads configuration portions that are necessary.
@ -417,34 +152,9 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
s.rb.Stop(ctx, s.log)
if !s.info.Mode.NoMetabase() {
defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
s.rb.Start(ctx, s.objectstore, s.log)
}()
}
ok, err := s.metaBase.Reload(ctx, c.metaOpts...)
if err != nil {
if errors.Is(err, meta.ErrDegradedMode) {
s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
_ = s.setMode(ctx, mode.DegradedReadOnly)
}
return err
}
if ok {
var err error
if c.refillMetabase {
// Here we refill metabase only if a new instance was opened. This is a feature,
// we don't want to hang for some time just because we forgot to change
// config after the node was updated.
err = s.refillMetabase(ctx)
} else {
err = s.metaBase.Init(ctx)
}
if err != nil {
s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
_ = s.setMode(ctx, mode.DegradedReadOnly)
return err
}
}
return s.setMode(ctx, c.info.Mode)
}

View file

@ -2,7 +2,6 @@ package shard
import (
"context"
"fmt"
"io/fs"
"math"
"os"
@ -10,7 +9,6 @@ import (
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
@ -19,14 +17,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
@ -108,297 +100,3 @@ func TestShardOpen(t *testing.T) {
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
require.NoError(t, sh.Close(context.Background()))
}
func TestRefillMetabaseCorrupted(t *testing.T) {
t.Parallel()
dir := t.TempDir()
fsTree := fstree.New(
fstree.WithDirNameLen(2),
fstree.WithPath(filepath.Join(dir, "blob")),
fstree.WithDepth(1))
blobOpts := []blobstor.Option{
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fsTree,
},
}),
}
mm := newMetricStore()
sh := New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})),
WithMetricsWriter(mm),
)
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular)
obj.SetPayload([]byte{0, 1, 2, 3, 4, 5})
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
require.NoError(t, sh.Close(context.Background()))
addr := object.AddressOf(obj)
// This is copied from `fstree.treePath()` to avoid exporting function just for tests.
{
saddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
p := fmt.Sprintf("%s/%s/%s", fsTree.RootPath, saddr[:2], saddr[2:])
require.NoError(t, os.WriteFile(p, []byte("not an object"), fsTree.Permissions))
}
sh = New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
WithRefillMetabase(true),
WithMetricsWriter(mm))
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
var getPrm GetPrm
getPrm.SetAddress(addr)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, sh.Close(context.Background()))
}
func TestRefillMetabase(t *testing.T) {
t.Parallel()
p := t.Name()
defer os.RemoveAll(p)
blobOpts := []blobstor.Option{
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(p, "blob")),
fstree.WithDepth(1)),
},
}),
}
mm := newMetricStore()
sh := New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama"))),
WithMetricsWriter(mm),
)
// open Blobstor
require.NoError(t, sh.Open(context.Background()))
// initialize Blobstor
require.NoError(t, sh.Init(context.Background()))
const objNum = 5
mObjs := make(map[string]objAddr)
locked := make([]oid.ID, 1, 2)
locked[0] = oidtest.ID()
cnrLocked := cidtest.ID()
for range objNum {
obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular)
if len(locked) < 2 {
obj.SetContainerID(cnrLocked)
id, _ := obj.ID()
locked = append(locked, id)
}
addr := object.AddressOf(obj)
mObjs[addr.EncodeToString()] = objAddr{
obj: obj,
addr: addr,
}
}
tombObj := objecttest.Object()
tombObj.SetType(objectSDK.TypeTombstone)
tombstone := objecttest.Tombstone()
tombData, err := tombstone.Marshal()
require.NoError(t, err)
tombObj.SetPayload(tombData)
tombMembers := make([]oid.Address, 0, len(tombstone.Members()))
members := tombstone.Members()
for i := range tombstone.Members() {
var a oid.Address
a.SetObject(members[i])
cnr, _ := tombObj.ContainerID()
a.SetContainer(cnr)
tombMembers = append(tombMembers, a)
}
var putPrm PutPrm
for _, v := range mObjs {
putPrm.SetObject(v.obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
}
putPrm.SetObject(tombObj)
_, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err)
// LOCK object handling
var lock objectSDK.Lock
lock.WriteMembers(locked)
lockObj := objecttest.Object()
lockObj.SetContainerID(cnrLocked)
objectSDK.WriteLock(lockObj, lock)
putPrm.SetObject(lockObj)
_, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err)
lockID, _ := lockObj.ID()
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
var inhumePrm InhumePrm
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var headPrm HeadPrm
checkObj := func(addr oid.Address, expObj *objectSDK.Object) {
headPrm.SetAddress(addr)
res, err := sh.Head(context.Background(), headPrm)
if expObj == nil {
require.True(t, client.IsErrObjectNotFound(err))
return
}
require.NoError(t, err)
require.Equal(t, expObj.CutPayload(), res.Object())
}
checkAllObjs := func(exists bool) {
for _, v := range mObjs {
if exists {
checkObj(v.addr, v.obj)
} else {
checkObj(v.addr, nil)
}
}
}
checkTombMembers := func(exists bool) {
for _, member := range tombMembers {
headPrm.SetAddress(member)
_, err := sh.Head(context.Background(), headPrm)
if exists {
require.True(t, client.IsErrObjectAlreadyRemoved(err))
} else {
require.True(t, client.IsErrObjectNotFound(err))
}
}
}
checkLocked := func(t *testing.T, cnr cid.ID, locked []oid.ID) {
var addr oid.Address
addr.SetContainer(cnr)
for i := range locked {
addr.SetObject(locked[i])
var prm InhumePrm
prm.MarkAsGarbage(addr)
var target *apistatus.ObjectLocked
_, err := sh.Inhume(context.Background(), prm)
require.ErrorAs(t, err, &target, "object %s should be locked", locked[i])
}
}
checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
c, err := sh.metaBase.ObjectCounters()
require.NoError(t, err)
phyBefore := c.Phy
logicalBefore := c.Logic
err = sh.Close(context.Background())
require.NoError(t, err)
sh = New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta_restored")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
WithMetricsWriter(mm),
)
// open Blobstor
require.NoError(t, sh.Open(context.Background()))
// initialize Blobstor
require.NoError(t, sh.Init(context.Background()))
defer sh.Close(context.Background())
checkAllObjs(false)
checkObj(object.AddressOf(tombObj), nil)
checkTombMembers(false)
err = sh.refillMetabase(context.Background())
require.NoError(t, err)
c, err = sh.metaBase.ObjectCounters()
require.NoError(t, err)
require.Equal(t, phyBefore, c.Phy)
require.Equal(t, logicalBefore, c.Logic)
checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb
require.Equal(t, "completed", mm.refillStatus)
require.Equal(t, uint32(100), mm.refillPercent)
}

View file

@ -23,7 +23,7 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
return 0, ErrDegradedMode
}
cc, err := s.metaBase.ObjectCounters()
cc, err := s.objectstore.ObjectCounters(ctx)
if err != nil {
return 0, err
}

View file

@ -2,17 +2,12 @@ package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// DeletePrm groups the parameters of Delete operation.
@ -62,21 +57,7 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
default:
}
if err := s.validateWritecacheDoesntContainObject(ctx, addr); err != nil {
if skipFailed {
continue
}
return result, err
}
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
if skipFailed {
continue
}
return result, err
}
if err := s.deleteFromMetabase(ctx, addr); err != nil {
if err := s.deleteFromObjectStore(ctx, addr); err != nil {
if skipFailed {
continue
}
@ -88,59 +69,11 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
return result, nil
}
func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr oid.Address) error {
if !s.hasWriteCache() {
return nil
}
_, err := s.writeCache.Head(ctx, addr)
if err == nil {
s.log.Warn(ctx, logs.ObjectRemovalFailureExistsInWritecache, zap.Stringer("object_address", addr))
return fmt.Errorf("object %s must be flushed from writecache", addr)
}
if client.IsErrObjectNotFound(err) {
return nil
}
return err
}
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
res, err := s.metaBase.StorageID(ctx, sPrm)
if err != nil {
s.log.Debug(ctx, logs.StorageIDRetrievalFailure,
zap.Stringer("object", addr),
zap.Error(err))
return err
}
storageID := res.StorageID()
if storageID == nil {
// if storageID is nil it means:
// 1. there is no such object
// 2. object stored by writecache: should not happen, as `validateWritecacheDoesntContainObject` called before `deleteFromBlobstor`
return nil
}
var delPrm common.DeletePrm
delPrm.Address = addr
delPrm.StorageID = storageID
_, err = s.blobStor.Delete(ctx, delPrm)
if err != nil && !client.IsErrObjectNotFound(err) {
s.log.Debug(ctx, logs.ObjectRemovalFailureBlobStor,
zap.Stringer("object_address", addr),
zap.Error(err))
return err
}
return nil
}
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {
func (s *Shard) deleteFromObjectStore(ctx context.Context, addr oid.Address) error {
var delPrm meta.DeletePrm
delPrm.SetAddresses(addr)
res, err := s.metaBase.Delete(ctx, delPrm)
res, err := s.objectstore.Delete(ctx, addr)
if err != nil {
return err
}

View file

@ -3,8 +3,7 @@ package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -53,10 +52,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
))
defer span.End()
var exists bool
var locked bool
var err error
s.m.RLock()
defer s.m.RUnlock()
@ -64,26 +59,17 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.Address
var res common.ExistsRes
res, err = s.blobStor.Exists(ctx, p)
exists = res.Exists
} else {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.Address)
existsPrm.SetECParent(prm.ECParentAddress)
var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm)
exists = res.Exists()
locked = res.Locked()
}
res, err := s.objectstore.Exists(ctx, objectstore.ExistsPrm{
Address: prm.Address,
ECParentAddress: prm.ECParentAddress,
})
if err != nil {
return ExistsRes{}, err
}
return ExistsRes{
ex: exists,
lc: locked,
ex: res.Exists,
lc: res.Locked,
}, err
}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -293,8 +294,8 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
var iterPrm objectstore.GarbageIterationPrm
iterPrm.SetHandler(func(g objectstore.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -312,7 +313,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(ctx, iterPrm)
err := s.objectstore.IterateOverGarbage(ctx, iterPrm)
if err != nil {
s.log.Warn(ctx, logs.ShardIteratorOverMetabaseGraveyardFailed,
zap.Error(err),
@ -368,7 +369,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *objectstore.ExpiredObject) {
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
batch = append(batch, o.Address())
@ -422,13 +423,13 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
var inhumePrm meta.InhumePrm
var inhumePrm objectstore.InhumePrm
inhumePrm.SetAddresses(expired...)
inhumePrm.SetGCMark()
// inhume the collected objects
res, err := s.metaBase.Inhume(ctx, inhumePrm)
res, err := s.objectstore.Inhume(ctx, inhumePrm)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects,
zap.Error(err),
@ -452,7 +453,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(ctx, source)
parentToChildren, err := s.objectstore.GetChildren(ctx, source)
if err != nil {
return nil, err
}
@ -479,11 +480,11 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
defer log.Debug(ctx, logs.ShardFinishedExpiredTombstonesHandling)
const tssDeleteBatch = 50
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
tss := make([]objectstore.TombstonedObject, 0, tssDeleteBatch)
tssExp := make([]objectstore.TombstonedObject, 0, tssDeleteBatch)
var iterPrm meta.GraveyardIterationPrm
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
var iterPrm objectstore.GraveyardIterationPrm
iterPrm.SetHandler(func(deletedObject objectstore.TombstonedObject) error {
tss = append(tss, deletedObject)
if len(tss) == tssDeleteBatch {
@ -505,7 +506,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return
}
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
err = s.objectstore.IterateOverGraveyard(ctx, iterPrm)
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
@ -531,7 +532,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
s.expiredTombstonesCallback(ctx, tssExp)
}
iterPrm.SetOffset(tss[tssLen-1].Address())
tss = tss[:0]
tssExp = tssExp[:0]
}
@ -556,7 +556,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *objectstore.ExpiredObject) {
if o.Type() == objectSDK.TypeLock {
batch = append(batch, o.Address())
@ -590,7 +590,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
}
}
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error {
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*objectstore.ExpiredObject)) error {
s.m.RLock()
defer s.m.RUnlock()
@ -598,7 +598,7 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo
return ErrDegradedMode
}
err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
err := s.objectstore.IterateExpired(ctx, epoch, func(expiredObject *objectstore.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
@ -621,14 +621,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
return nil, ErrDegradedMode
}
return s.metaBase.FilterExpired(ctx, epoch, addresses)
return s.objectstore.FilterExpired(ctx, epoch, addresses)
}
// HandleExpiredTombstones marks tombstones themselves as garbage
// and clears up corresponding graveyard records.
//
// Does not modify tss.
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []objectstore.TombstonedObject) {
s.m.RLock()
defer s.m.RUnlock()
@ -636,7 +636,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return
}
res, err := s.metaBase.InhumeTombstones(ctx, tss)
res, err := s.objectstore.InhumeTombstones(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.Error(err),
@ -664,7 +664,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
if s.GetMode().NoMetabase() {
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
unlocked, err := s.objectstore.FreeLockedBy(lockers)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
@ -673,11 +673,11 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
return
}
var pInhume meta.InhumePrm
var pInhume objectstore.InhumePrm
pInhume.SetAddresses(lockers...)
pInhume.SetForceGCMark()
res, err := s.metaBase.Inhume(ctx, pInhume)
res, err := s.objectstore.Inhume(ctx, pInhume)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage,
zap.Error(err),
@ -721,7 +721,7 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
return
}
_, err := s.metaBase.FreeLockedBy(lockers)
_, err := s.objectstore.FreeLockedBy(lockers)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
@ -750,7 +750,7 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
}
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
ids, err := s.metaBase.ZeroSizeContainers(ctx)
ids, err := s.objectstore.ZeroSizeContainers(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
@ -762,7 +762,7 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui
}
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
ids, err := s.metaBase.ZeroCountContainers(ctx)
ids, err := s.objectstore.ZeroCountContainers(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return

View file

@ -1,141 +0,0 @@
package shard
import (
"context"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
t.Parallel()
rootPath := t.TempDir()
var sh *Shard
l := test.NewLogger(t)
blobOpts := []blobstor.Option{
blobstor.WithLogger(test.NewLogger(t)),
blobstor.WithStorages([]blobstor.SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
context.Background(),
blobovniczatree.WithLogger(test.NewLogger(t)),
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(1),
blobovniczatree.WithBlobovniczaShallowWidth(1)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
},
{
Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))),
},
}),
}
opts := []Option{
WithID(NewIDFromBytes([]byte{})),
WithLogger(l),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
WithDeletedLockCallback(func(ctx context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(ctx, addresses)
}),
WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
WithGCRemoverSleepInterval(1 * time.Second),
WithDisabledGC(),
}
sh = New(opts...)
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)
objID, _ := obj.ID()
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(objID)
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err, "failed to get")
// inhume
var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(addr)
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err, "failed to inhume")
_, err = sh.Get(context.Background(), getPrm)
require.Error(t, err, "get returned error")
require.True(t, client.IsErrObjectNotFound(err), "invalid error type")
// storageID
var metaStIDPrm meta.StorageIDPrm
metaStIDPrm.SetAddress(addr)
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
require.NoError(t, err, "failed to get storage ID")
// check existence in blobstore
var bsExisted common.ExistsPrm
bsExisted.Address = addr
bsExisted.StorageID = storageID.StorageID()
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existence")
require.True(t, exRes.Exists, "invalid blobstore existence result")
// drop from blobstor
var bsDeletePrm common.DeletePrm
bsDeletePrm.Address = addr
bsDeletePrm.StorageID = storageID.StorageID()
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
require.NoError(t, err, "failed to delete from blobstore")
// check existence in blobstore
exRes, err = sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existence")
require.False(t, exRes.Exists, "invalid blobstore existence result")
// get should return object not found
_, err = sh.Get(context.Background(), getPrm)
require.Error(t, err, "get returned no error")
require.True(t, client.IsErrObjectNotFound(err), "invalid error type")
}

View file

@ -1,295 +0,0 @@
package shard
import (
"context"
"errors"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
t.Parallel()
epoch := &epochState{
Value: 100,
}
sh := newCustomShard(t, false, shardOptions{
metaOptions: []meta.Option{meta.WithEpochState(epoch)},
additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
return util.NewPseudoWorkerPool() // synchronous event processing
})},
})
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()
var objExpirationAttr objectSDK.Attribute
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
objExpirationAttr.SetValue("101")
obj := testutil.GenerateObjectWithCID(cnr)
obj.SetAttributes(objExpirationAttr)
objID, _ := obj.ID()
var lockExpirationAttr objectSDK.Attribute
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
lockExpirationAttr.SetValue("103")
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID()
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
require.NoError(t, err)
putPrm.SetObject(lock)
_, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err)
epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired object must be deleted")
}
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
t.Parallel()
epoch := &epochState{
Value: 100,
}
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
var objExpirationAttr objectSDK.Attribute
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
objExpirationAttr.SetValue("101")
var lockExpirationAttr objectSDK.Attribute
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
lockExpirationAttr.SetValue("103")
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
parent.SetAttributes(objExpirationAttr)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
linkID, _ := link.ID()
sh := newCustomShard(t, false, shardOptions{
metaOptions: []meta.Option{meta.WithEpochState(epoch)},
additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
return util.NewPseudoWorkerPool() // synchronous event processing
})},
})
defer func() { require.NoError(t, sh.Close(context.Background())) }()
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID()
var putPrm PutPrm
for _, child := range children {
putPrm.SetObject(child)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
}
putPrm.SetObject(link)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
require.NoError(t, err)
putPrm.SetObject(lock)
_, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err)
var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(parent))
_, err = sh.Get(context.Background(), getPrm)
var splitInfoError *objectSDK.SplitInfoError
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")
}
func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {
t.Parallel()
t.Run("flush write-cache before inhume", func(t *testing.T) {
t.Parallel()
testGCDropsObjectInhumedFromWritecache(t, true)
})
t.Run("don't flush write-cache before inhume", func(t *testing.T) {
t.Parallel()
testGCDropsObjectInhumedFromWritecache(t, false)
})
}
func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) {
sh := newCustomShard(t, true, shardOptions{
additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
})
defer func() { require.NoError(t, sh.Close(context.Background())) }()
obj := testutil.GenerateObjectWithSize(1024)
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
// writecache stores object
wcObj, err := sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
require.NoError(t, err)
require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj))
// blobstore doesn't store object
bsRes, err := sh.blobStor.Get(context.Background(), common.GetPrm{
Address: objectCore.AddressOf(obj),
})
require.ErrorAs(t, err, new(*apistatus.ObjectNotFound))
require.Nil(t, bsRes.Object)
require.Nil(t, bsRes.RawData)
if flushbeforeInhume {
sh.writeCache.Flush(context.Background(), false, false)
}
var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(objectCore.AddressOf(obj))
_, err = sh.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
// writecache doesn't store object
wcObj, err = sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
require.Error(t, err)
require.Nil(t, wcObj)
if flushbeforeInhume {
// blobstore store object
bsRes, err = sh.blobStor.Get(context.Background(), common.GetPrm{
Address: objectCore.AddressOf(obj),
})
require.NoError(t, err)
require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(bsRes.Object))
} else {
// blobstore doesn't store object
bsRes, err = sh.blobStor.Get(context.Background(), common.GetPrm{
Address: objectCore.AddressOf(obj),
})
require.ErrorAs(t, err, new(*apistatus.ObjectNotFound))
require.Nil(t, bsRes.Object)
require.Nil(t, bsRes.RawData)
}
gcRes := sh.removeGarbage(context.Background())
require.True(t, gcRes.success)
require.Equal(t, uint64(1), gcRes.deleted)
}
func TestGCDontDeleteObjectFromWritecache(t *testing.T) {
sh := newCustomShard(t, true, shardOptions{
additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
})
defer func() { require.NoError(t, sh.Close(context.Background())) }()
obj := testutil.GenerateObjectWithSize(1024)
var putPrm PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
// writecache stores object
wcObj, err := sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
require.NoError(t, err)
require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj))
// blobstore doesn't store object
bsRes, err := sh.blobStor.Get(context.Background(), common.GetPrm{
Address: objectCore.AddressOf(obj),
})
require.ErrorAs(t, err, new(*apistatus.ObjectNotFound))
require.Nil(t, bsRes.Object)
require.Nil(t, bsRes.RawData)
var metaInhumePrm meta.InhumePrm
metaInhumePrm.SetAddresses(objectCore.AddressOf(obj))
metaInhumePrm.SetLockObjectHandling()
metaInhumePrm.SetGCMark()
_, err = sh.metaBase.Inhume(context.Background(), metaInhumePrm)
require.NoError(t, err)
// logs: WARN shard/delete.go:98 can't remove object: object must be flushed from writecache
gcRes := sh.removeGarbage(context.Background())
require.True(t, gcRes.success)
require.Equal(t, uint64(0), gcRes.deleted)
// writecache stores object
wcObj, err = sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
require.NoError(t, err)
require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj))
}

View file

@ -2,22 +2,15 @@ package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// storFetcher is a type to unify object fetching mechanism in `fetchObjectData`
@ -94,96 +87,12 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm
getPrm.Address = prm.addr
getPrm.StorageID = id
res, err := stor.Get(ctx, getPrm)
if err != nil {
return nil, err
}
return res.Object, nil
obj, err := s.objectstore.Get(ctx, prm.addr)
if err != nil {
return GetRes{}, err
}
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
return c.Get(ctx, prm.addr)
}
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)
return GetRes{
obj: obj,
hasMeta: hasMeta,
}, err
}
// emptyStorageID is an empty storageID that indicates that
// an object is big (and is stored in an FSTree, not in a blobovnicza).
var emptyStorageID = make([]byte, 0)
// fetchObjectData looks through writeCache and blobStor to find object.
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
var (
mErr error
mRes meta.ExistsRes
)
if !skipMeta {
var mPrm meta.ExistsPrm
mPrm.SetAddress(addr)
mRes, mErr = s.metaBase.Exists(ctx, mPrm)
if mErr != nil && !s.info.Mode.NoMetabase() {
return nil, false, mErr
}
if !mRes.Exists() {
return nil, false, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
} else {
s.log.Warn(ctx, logs.ShardFetchingObjectWithoutMeta, zap.Stringer("addr", addr))
}
if s.hasWriteCache() {
res, err := wc(s.writeCache)
if err == nil || IsErrOutOfRange(err) {
return res, false, err
}
if client.IsErrObjectNotFound(err) {
s.log.Debug(ctx, logs.ShardObjectIsMissingInWritecache,
zap.Stringer("addr", addr),
zap.Bool("skip_meta", skipMeta))
} else {
s.log.Error(ctx, logs.ShardFailedToFetchObjectFromWritecache,
zap.Error(err),
zap.Stringer("addr", addr),
zap.Bool("skip_meta", skipMeta))
}
}
if skipMeta || mErr != nil {
res, err := cb(s.blobStor, nil)
return res, false, err
}
var mPrm meta.StorageIDPrm
mPrm.SetAddress(addr)
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
if err != nil {
return nil, true, fmt.Errorf("fetch blobovnicza id from metabase: %w", err)
}
storageID := mExRes.StorageID()
if storageID == nil {
// `nil` storageID returned without any error
// means that object is big, `cb` expects an
// empty (but non-nil) storageID in such cases
storageID = emptyStorageID
}
res, err := cb(s.blobStor, storageID)
return res, true, err
hasMeta: true,
}, nil
}

View file

@ -3,7 +3,7 @@ package shard
import (
"context"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -62,30 +62,18 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
var obj *objectSDK.Object
var err error
mode := s.GetMode()
if mode.NoMetabase() || (mode.ReadOnly() && prm.ShardLooksBad) {
var getPrm GetPrm
getPrm.SetAddress(prm.addr)
getPrm.SetIgnoreMeta(true)
var res GetRes
res, err = s.Get(ctx, getPrm)
obj = res.Object()
} else {
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
var headParams meta.GetPrm
headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw)
var res meta.GetRes
res, err = s.metaBase.Get(ctx, headParams)
obj = res.Header()
s.m.RLock()
defer s.m.RUnlock()
if s.info.EvacuationInProgress {
return HeadRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
headParams := objectstore.HeadPrm{
Address: prm.addr,
Raw: prm.raw,
}
obj, err = s.objectstore.Head(ctx, headParams)
return HeadRes{
obj: obj,
}, err

View file

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/mr-tron/base58"
"go.uber.org/zap"
)
@ -35,7 +34,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
var idFromMetabase []byte
modeDegraded := s.GetMode().NoMetabase()
if !modeDegraded {
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
if idFromMetabase, err = s.objectstore.GetShardID(ctx); err != nil {
err = fmt.Errorf("read shard id from metabase: %w", err)
}
}
@ -46,24 +45,16 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
shardID := s.info.ID.String()
s.cfg.metricsWriter.SetShardID(shardID)
if s.writeCache != nil && s.writeCache.GetMetrics() != nil {
s.writeCache.GetMetrics().SetShardID(shardID)
}
s.log = s.log.With(zap.Stringer("shard_id", s.info.ID))
s.metaBase.SetLogger(s.log)
s.blobStor.SetLogger(s.log)
if s.hasWriteCache() {
s.writeCache.SetLogger(s.log)
}
s.metaBase.SetParentID(s.info.ID.String())
s.blobStor.SetParentID(s.info.ID.String())
s.objectstore.SetLogger(s.log)
s.objectstore.SetParentID(s.info.ID.String())
if s.pilorama != nil {
s.pilorama.SetParentID(s.info.ID.String())
}
if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
if setErr := s.objectstore.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
err = errors.Join(err, fmt.Errorf("write shard id to metabase: %w", setErr))
}
}

View file

@ -1,11 +1,9 @@
package shard
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
)
// Info groups the information about Shard.
@ -19,20 +17,13 @@ type Info struct {
// True when evacuation is in progress.
EvacuationInProgress bool
// Information about the metabase.
MetaBaseInfo meta.Info
// Information about the BLOB storage.
BlobStorInfo blobstor.Info
// Information about the Write Cache.
WriteCacheInfo writecache.Info
// ErrorCount contains amount of errors occurred in shard operations.
ErrorCount uint32
// PiloramaInfo contains information about trees stored on this shard.
PiloramaInfo pilorama.Info
ObjectStoreInfo objectstore.Info
}
// DumpInfo returns information about the Shard.

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -81,13 +82,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return InhumeRes{}, ErrDegradedMode
}
if s.hasWriteCache() {
for i := range prm.target {
_ = s.writeCache.Delete(ctx, prm.target[i])
}
}
var metaPrm meta.InhumePrm
var metaPrm objectstore.InhumePrm
metaPrm.SetAddresses(prm.target...)
metaPrm.SetLockObjectHandling()
@ -101,7 +96,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
metaPrm.SetForceGCMark()
}
res, err := s.metaBase.Inhume(ctx, metaPrm)
res, err := s.objectstore.Inhume(ctx, metaPrm)
if err != nil {
if errors.Is(err, meta.ErrLockObjectRemoval) {
s.m.RUnlock()

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -60,13 +61,13 @@ type CountAliveObjectsInContainerPrm struct {
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
cursor *Cursor
cursor *objectstore.Cursor
}
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []objectcore.Info
cursor *Cursor
cursor *objectstore.Cursor
}
// WithCount sets maximum amount of addresses that ListWithCursor should return.
@ -77,7 +78,7 @@ func (p *ListWithCursorPrm) WithCount(count uint32) {
// WithCursor sets cursor for ListWithCursor operation. For initial request,
// ignore this param or use nil value. For consecutive requests, use value
// from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
func (p *ListWithCursorPrm) WithCursor(cursor *objectstore.Cursor) {
p.cursor = cursor
}
@ -87,7 +88,7 @@ func (r ListWithCursorRes) AddressList() []objectcore.Info {
}
// Cursor returns cursor for consecutive listing requests.
func (r ListWithCursorRes) Cursor() *Cursor {
func (r ListWithCursorRes) Cursor() *objectstore.Cursor {
return r.cursor
}
@ -106,7 +107,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
return SelectRes{}, ErrDegradedMode
}
lst, err := s.metaBase.Containers(ctx)
lst, err := s.objectstore.Containers(ctx)
if err != nil {
return res, fmt.Errorf("list stored containers: %w", err)
}
@ -115,11 +116,12 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
filters.AddPhyFilter()
for i := range lst {
var sPrm meta.SelectPrm
sPrm.SetContainerID(lst[i])
sPrm.SetFilters(filters)
sPrm := objectstore.SelectPrm{
Container: lst[i],
Filters: filters,
}
sRes, err := s.metaBase.Select(ctx, sPrm) // consider making List in metabase
sRes, err := s.objectstore.Select(ctx, sPrm) // consider making List in metabase
if err != nil {
s.log.Debug(ctx, logs.ShardCantSelectAllObjects,
zap.Stringer("cid", lst[i]),
@ -128,7 +130,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
continue
}
res.addrList = append(res.addrList, sRes.AddressList()...)
res.addrList = append(res.addrList, sRes.AddressList...)
}
return res, nil
@ -145,7 +147,7 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
return ListContainersRes{}, ErrDegradedMode
}
containers, err := s.metaBase.Containers(ctx)
containers, err := s.objectstore.Containers(ctx)
if err != nil {
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
}
@ -173,17 +175,18 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
return ListWithCursorRes{}, ErrDegradedMode
}
var metaPrm meta.ListPrm
metaPrm.SetCount(prm.count)
metaPrm.SetCursor(prm.cursor)
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
metaPrm := objectstore.ListPrm{
Count: int(prm.count),
Cursor: prm.cursor,
}
res, err := s.objectstore.ListWithCursor(ctx, metaPrm)
if err != nil {
return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err)
}
return ListWithCursorRes{
addrList: res.AddressList(),
cursor: res.Cursor(),
addrList: res.AddressList,
cursor: res.Cursor,
}, nil
}
@ -202,9 +205,7 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
return ErrDegradedMode
}
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
err := s.objectstore.IterateOverContainers(ctx, prm.Handler)
if err != nil {
return fmt.Errorf("iterate over containers: %w", err)
}
@ -227,11 +228,11 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return ErrDegradedMode
}
var metaPrm meta.IterateOverObjectsInContainerPrm
var metaPrm objectstore.IterateOverObjectsInContainerPrm
metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
err := s.objectstore.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over objects: %w", err)
}
@ -251,10 +252,10 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
return 0, ErrDegradedMode
}
var metaPrm meta.CountAliveObjectsInContainerPrm
var metaPrm objectstore.CountAliveObjectsInContainerPrm
metaPrm.ObjectType = prm.ObjectType
metaPrm.ContainerID = prm.ContainerID
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
count, err := s.objectstore.CountAliveObjectsInContainer(ctx, metaPrm)
if err != nil {
return 0, fmt.Errorf("count alive objects in bucket: %w", err)
}

View file

@ -2,9 +2,7 @@ package shard
import (
"context"
"fmt"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -38,12 +36,7 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
return ErrDegradedMode
}
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
if err != nil {
return fmt.Errorf("metabase lock: %w", err)
}
return nil
return s.objectstore.Lock(ctx, idCnr, locker, locked)
}
// IsLocked checks object locking relation of the provided object. Not found object is
@ -61,15 +54,7 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return false, ErrDegradedMode
}
var prm meta.IsLockedPrm
prm.SetAddress(addr)
res, err := s.metaBase.IsLocked(ctx, prm)
if err != nil {
return false, err
}
return res.Locked(), nil
return s.objectstore.IsLocked(ctx, addr)
}
// GetLocks return lock id's of the provided object. Not found object is
@ -86,5 +71,5 @@ func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error
if m.NoMetabase() {
return nil, ErrDegradedMode
}
return s.metaBase.GetLocks(ctx, addr)
return s.objectstore.GetLocks(ctx, addr)
}

View file

@ -265,7 +265,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize())
cc, err := sh.metaBase.ContainerCounters(context.Background())
cc, err := sh.objectstore.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
@ -300,7 +300,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize())
cc, err := sh.metaBase.ContainerCounters(context.Background())
cc, err := sh.objectstore.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
@ -345,7 +345,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
cc, err = sh.objectstore.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
@ -386,7 +386,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
cc, err = sh.objectstore.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
})

View file

@ -32,37 +32,12 @@ func (s *Shard) setMode(ctx context.Context, m mode.Mode) error {
zap.Stringer("old_mode", s.info.Mode),
zap.Stringer("new_mode", m))
components := []interface {
SetMode(context.Context, mode.Mode) error
}{
s.metaBase, s.blobStor,
}
if s.hasWriteCache() {
components = append(components, s.writeCache)
}
if s.pilorama != nil {
components = append(components, s.pilorama)
}
// The usual flow of the requests (pilorama is independent):
// writecache -> blobstor -> metabase
// For mode.ReadOnly and mode.Degraded the order is:
// writecache -> blobstor -> metabase
// For mode.ReadWrite it is the opposite:
// metabase -> blobstor -> writecache
if m != mode.ReadWrite {
if s.hasWriteCache() {
components[0], components[2] = components[2], components[0]
} else {
components[0], components[1] = components[1], components[0]
}
}
if !m.Disabled() {
for i := range components {
if err := components[i].SetMode(ctx, m); err != nil {
if err := s.objectstore.SetMode(ctx, m); err != nil {
return err
}
if s.pilorama != nil {
if err := s.pilorama.SetMode(ctx, m); err != nil {
return err
}
}

View file

@ -2,17 +2,12 @@ package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// PutPrm groups the parameters of Put operation.
@ -55,54 +50,5 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return PutRes{}, ErrReadOnlyMode
}
data, err := prm.obj.Marshal()
if err != nil {
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
}
var putPrm common.PutPrm // form Put parameters
putPrm.Object = prm.obj
putPrm.RawData = data
putPrm.Address = objectCore.AddressOf(prm.obj)
var res common.PutRes
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()
if tryCache {
res, err = s.writeCache.Put(ctx, putPrm)
}
if err != nil || !tryCache {
if err != nil {
s.log.Debug(ctx, logs.ShardCantPutObjectToTheWritecacheTryingBlobstor,
zap.Error(err))
}
res, err = s.blobStor.Put(ctx, putPrm)
if err != nil {
return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err)
}
}
if !m.NoMetabase() {
var pPrm meta.PutPrm
pPrm.SetObject(prm.obj)
pPrm.SetStorageID(res.StorageID)
pPrm.SetIndexAttributes(prm.indexAttributes)
res, err := s.metaBase.Put(ctx, pPrm)
if err != nil {
// may we need to handle this case in a special way
// since the object has been successfully written to BlobStor
return PutRes{}, fmt.Errorf("put object to metabase: %w", err)
}
if res.Inserted {
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
}
}
return PutRes{}, nil
return PutRes{}, s.objectstore.Put(ctx, prm.obj)
}

View file

@ -4,10 +4,7 @@ import (
"context"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -95,47 +92,12 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
return RngRes{}, ErrShardDisabled
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getRngPrm common.GetRangePrm
getRngPrm.Address = prm.addr
getRngPrm.Range.SetOffset(prm.off)
getRngPrm.Range.SetLength(prm.ln)
getRngPrm.StorageID = id
res, err := stor.GetRange(ctx, getRngPrm)
if err != nil {
return nil, err
}
obj := objectSDK.New()
obj.SetPayload(res.Data)
return obj, nil
obj, err := s.objectstore.GetRange(ctx, prm.addr, prm.off, prm.ln)
if err != nil {
return RngRes{}, err
}
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
res, err := c.Get(ctx, prm.addr)
if err != nil {
return nil, err
}
payload := res.Payload()
from := prm.off
to := from + prm.ln
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
obj := objectSDK.New()
obj.SetPayload(payload[from:to])
return obj, nil
}
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)
return RngRes{
obj: obj,
hasMeta: hasMeta,
}, err
hasMeta: true,
}, nil
}

View file

@ -7,8 +7,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
@ -69,7 +69,7 @@ func newRebuilder() *rebuilder {
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
func (r *rebuilder) Start(ctx context.Context, os *objectstore.ObjectStore, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
@ -90,13 +90,13 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
if !ok {
continue
}
runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
runRebuild(ctx, os, log, t.fillPercent, t.limiter)
}
}
}()
}
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
func runRebuild(ctx context.Context, os *objectstore.ObjectStore, log *logger.Logger,
fillPercent int, limiter RebuildWorkerLimiter,
) {
select {
@ -106,7 +106,7 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
}
log.Info(ctx, logs.BlobstoreRebuildStarted)
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
if err := os.Rebuild(ctx); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully)

View file

@ -1,76 +0,0 @@
package shard
import (
"context"
"os"
"testing"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func BenchmarkRefillMetabase(b *testing.B) {
b.Run("100 objects", func(b *testing.B) {
benchRefillMetabase(b, 100)
})
b.Run("1000 objects", func(b *testing.B) {
benchRefillMetabase(b, 1000)
})
b.Run("2000 objects", func(b *testing.B) {
benchRefillMetabase(b, 2000)
})
b.Run("5000 objects", func(b *testing.B) {
benchRefillMetabase(b, 5000)
})
}
func benchRefillMetabase(b *testing.B, objectsCount int) {
sh := newCustomShard(b, false, shardOptions{
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
})
defer func() { require.NoError(b, sh.Close(context.Background())) }()
var putPrm PutPrm
for range objectsCount / 2 {
obj := testutil.GenerateObject()
testutil.AddAttribute(obj, "foo", "bar")
testutil.AddPayload(obj, 1<<5) // blobvnicza tree obj
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(b, err)
}
for range objectsCount / 2 {
obj := testutil.GenerateObject()
testutil.AddAttribute(obj, "foo", "bar")
obj.SetID(oidtest.ID())
testutil.AddPayload(obj, 1<<20) // fstree obj
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(b, err)
}
require.NoError(b, sh.Close(context.Background()))
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
require.NoError(b, sh.Open(context.Background()))
sh.cfg.refillMetabase = true
b.ReportAllocs()
b.ResetTimer()
require.NoError(b, sh.Init(context.Background()))
require.NoError(b, sh.Close(context.Background()))
}

View file

@ -90,7 +90,7 @@ func TestShardReload(t *testing.T) {
t.Run("open meta at new path", func(t *testing.T) {
newShardOpts := func(metaPath string, resync bool) []Option {
metaOpts := []meta.Option{meta.WithPath(metaPath), meta.WithEpochState(epochState{})}
return append(opts, WithMetaBaseOptions(metaOpts...), WithRefillMetabase(resync))
return append(opts, WithMetaBaseOptions(metaOpts...))
}
newOpts := newShardOpts(filepath.Join(p, "meta1"), false)

View file

@ -4,7 +4,7 @@ import (
"context"
"fmt"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -60,17 +60,18 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
return SelectRes{}, ErrDegradedMode
}
var selectPrm meta.SelectPrm
selectPrm.SetFilters(prm.filters)
selectPrm.SetContainerID(prm.cnr)
selectPrm.SetUseAttributeIndex(prm.isIndexedContainer)
selectPrm := objectstore.SelectPrm{
Filters: prm.filters,
Container: prm.cnr,
UseAttributeIndex: prm.isIndexedContainer,
}
mRes, err := s.metaBase.Select(ctx, selectPrm)
mRes, err := s.objectstore.Select(ctx, selectPrm)
if err != nil {
return SelectRes{}, fmt.Errorf("select objects from metabase: %w", err)
}
return SelectRes{
addrList: mRes.AddressList(),
addrList: mRes.AddressList,
}, nil
}

View file

@ -27,13 +27,9 @@ type Shard struct {
gc *gc
writeCache writecache.Cache
blobStor *blobstor.BlobStor
pilorama pilorama.ForestStorage
metaBase *meta.DB
objectstore *objectstore.ObjectStore
tsSource TombstoneSource
@ -48,7 +44,7 @@ type Shard struct {
type Option func(*cfg)
// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
type ExpiredTombstonesCallback func(context.Context, []objectstore.TombstonedObject)
// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
@ -62,9 +58,6 @@ type EmptyContainersCallback func(context.Context, []cid.ID)
type cfg struct {
m sync.RWMutex
refillMetabase bool
refillMetabaseWorkersCount int
rmBatchSize int
useWriteCache bool
@ -123,29 +116,12 @@ func New(opts ...Option) *Shard {
opts[i](c)
}
bs := blobstor.New(c.blobOpts...)
mb := meta.New(c.metaOpts...)
os := objectstore.New(c.objectStoreOpts...)
s := &Shard{
cfg: c,
blobStor: bs,
metaBase: mb,
tsSource: c.tsSource,
}
reportFunc := func(ctx context.Context, msg string, err error) {
s.reportErrorFunc(ctx, s.ID().String(), msg, err)
}
s.blobStor.SetReportErrorFunc(reportFunc)
if c.useWriteCache {
s.writeCache = writecache.New(
append(c.writeCacheOpts,
writecache.WithReportErrorFunc(reportFunc),
writecache.WithBlobstor(bs),
writecache.WithMetabase(mb))...)
s.writeCache.GetMetrics().SetPath(s.writeCache.DumpInfo().Path)
cfg: c,
tsSource: c.tsSource,
objectstore: os,
}
if s.piloramaOpts != nil {
@ -153,7 +129,6 @@ func New(opts ...Option) *Shard {
}
s.fillInfo()
s.writecacheSealCancel.Store(notInitializedCancel)
return s
}
@ -226,11 +201,6 @@ func (s *Shard) hasWriteCache() bool {
return s.cfg.useWriteCache
}
// NeedRefillMetabase returns true if metabase is needed to be refilled.
func (s *Shard) NeedRefillMetabase() bool {
return s.cfg.refillMetabase
}
// WithRemoverBatchSize returns option to set batch size
// of single removal operation.
func WithRemoverBatchSize(sz int) Option {
@ -271,20 +241,6 @@ func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option {
}
}
// WithRefillMetabase returns option to set flag to refill the Metabase on Shard's initialization step.
func WithRefillMetabase(v bool) Option {
return func(c *cfg) {
c.refillMetabase = v
}
}
// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step.
func WithRefillMetabaseWorkersCount(v int) Option {
return func(c *cfg) {
c.refillMetabaseWorkersCount = v
}
}
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
// - mode.ReadWrite;
// - mode.ReadOnly.
@ -378,13 +334,8 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
s.cfg.info.Mode = s.GetMode()
if s.cfg.useWriteCache {
s.cfg.info.WriteCacheInfo = s.writeCache.DumpInfo()
}
s.cfg.info.ObjectStoreInfo = s.objectstore.Info()
if s.pilorama != nil {
s.cfg.info.PiloramaInfo = s.pilorama.DumpInfo()
}
@ -408,7 +359,7 @@ func (s *Shard) updateMetrics(ctx context.Context) {
return
}
cc, err := s.metaBase.ObjectCounters()
cc, err := s.objectstore.ObjectCounters(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardMetaObjectCounterRead,
zap.Error(err),
@ -421,7 +372,7 @@ func (s *Shard) updateMetrics(ctx context.Context) {
s.setObjectCounterBy(logical, cc.Logic)
s.setObjectCounterBy(user, cc.User)
cnrList, err := s.metaBase.Containers(ctx)
cnrList, err := s.objectstore.Containers(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardMetaCantReadContainerList, zap.Error(err))
return
@ -430,7 +381,7 @@ func (s *Shard) updateMetrics(ctx context.Context) {
var totalPayload uint64
for i := range cnrList {
size, err := s.metaBase.ContainerSize(cnrList[i])
size, err := s.objectstore.ContainerSize(ctx, cnrList[i])
if err != nil {
s.log.Warn(ctx, logs.ShardMetaCantReadContainerSize,
zap.String("cid", cnrList[i].EncodeToString()),
@ -443,7 +394,7 @@ func (s *Shard) updateMetrics(ctx context.Context) {
s.addToPayloadSize(int64(totalPayload))
contCount, err := s.metaBase.ContainerCounters(ctx)
contCount, err := s.objectstore.ContainerCounters(ctx)
if err != nil {
s.log.Warn(ctx, logs.FailedToGetContainerCounters, zap.Error(err))
return
@ -481,7 +432,7 @@ func (s *Shard) setObjectCounterBy(typ string, v uint64) {
}
}
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]objectstore.ObjectCounters) {
for cnrID, count := range byCnr {
if count.Phy > 0 {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy)

View file

@ -5,7 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/objectstore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -67,7 +67,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode
}
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
return s.objectstore.Flush(ctx, p.ignoreErrors, p.seal)
}
type SealWriteCachePrm struct {
@ -117,7 +117,7 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
if !p.Async {
defer cleanup()
}
prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}
prm := objectstore.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}
if p.Async {
started := make(chan struct{})
go func() {
@ -125,7 +125,7 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
defer cleanup()
s.log.Info(ctx, logs.StartedWritecacheSealAsync)
if err := s.writeCache.Seal(ctx, prm); err != nil {
if err := s.objectstore.Seal(ctx, prm); err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
return
}
@ -138,5 +138,5 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
return nil
}
}
return s.writeCache.Seal(ctx, prm)
return s.objectstore.Seal(ctx, prm)
}

View file

@ -31,9 +31,6 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
si := new(control.ShardInfo)
si.SetShard_ID(*sh.ID)
si.SetMetabasePath(sh.MetaBaseInfo.Path)
si.Blobstor = blobstorInfoToProto(sh.BlobStorInfo)
si.SetWritecachePath(sh.WriteCacheInfo.Path)
si.SetPiloramaPath(sh.PiloramaInfo.Path)
var m control.ShardMode