Drop frostfs_node_engine_container_size_bytes
and ..._count_total
metric for removed containers #889
18 changed files with 716 additions and 109 deletions
|
@ -45,6 +45,8 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||||
containerGRPC.RegisterContainerServiceServer(s, server)
|
containerGRPC.RegisterContainerServiceServer(s, server)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
||||||
|
|
|
@ -557,4 +557,14 @@ const (
|
||||||
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
||||||
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
|
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
|
||||||
FailedToReportStatusToSystemd = "failed to report status to systemd"
|
FailedToReportStatusToSystemd = "failed to report status to systemd"
|
||||||
|
ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started"
|
||||||
|
ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed"
|
||||||
|
ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers"
|
||||||
|
ShardGCFailedToCollectZeroCountContainers = "failed to collect zero-count containers"
|
||||||
|
EngineFailedToCheckContainerAvailability = "failed to check container availability"
|
||||||
|
EngineFailedToGetContainerSize = "failed to get container size"
|
||||||
|
EngineFailedToDeleteContainerSize = "failed to delete container size"
|
||||||
|
EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers"
|
||||||
|
EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers"
|
||||||
|
EngineFailedToGetContainerCounters = "failed to get container counters"
|
||||||
)
|
)
|
||||||
|
|
|
@ -7,12 +7,14 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -218,14 +220,18 @@ type cfg struct {
|
||||||
lowMem bool
|
lowMem bool
|
||||||
|
|
||||||
rebuildWorkersCount uint32
|
rebuildWorkersCount uint32
|
||||||
|
|
||||||
|
containerSource atomic.Pointer[containerSource]
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
return &cfg{
|
res := &cfg{
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
shardPoolSize: 20,
|
shardPoolSize: 20,
|
||||||
rebuildWorkersCount: 100,
|
rebuildWorkersCount: 100,
|
||||||
}
|
}
|
||||||
|
res.containerSource.Store(&containerSource{})
|
||||||
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns new StorageEngine instance.
|
// New creates, initializes and returns new StorageEngine instance.
|
||||||
|
@ -288,3 +294,30 @@ func WithRebuildWorkersCount(count uint32) Option {
|
||||||
c.rebuildWorkersCount = count
|
c.rebuildWorkersCount = count
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetContainerSource sets container source.
|
||||||
|
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
||||||
|
e.containerSource.Store(&containerSource{cs: cs})
|
||||||
|
}
|
||||||
|
|
||||||
|
type containerSource struct {
|
||||||
|
cs container.Source
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if s == nil || s.cs == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
wasRemoved, err := container.WasRemoved(s.cs, id)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return !wasRemoved, nil
|
||||||
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -259,3 +260,177 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid.ID) {
|
||||||
|
if len(ids) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
idMap, err := e.selectNonExistentIDs(ctx, ids)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(idMap) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var failed bool
|
||||||
|
var prm shard.ContainerSizePrm
|
||||||
|
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err()))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
var drop []cid.ID
|
||||||
|
for id := range idMap {
|
||||||
|
prm.SetContainerID(id)
|
||||||
|
s, err := sh.ContainerSize(prm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if s.Size() > 0 {
|
||||||
|
drop = append(drop, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, id := range drop {
|
||||||
|
delete(idMap, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(idMap) == 0
|
||||||
|
})
|
||||||
|
|
||||||
|
if failed || len(idMap) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err()))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
for id := range idMap {
|
||||||
|
if err := sh.DeleteContainerSize(ctx, id); err != nil {
|
||||||
|
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
if failed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for id := range idMap {
|
||||||
|
e.metrics.DeleteContainerSize(id.EncodeToString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []cid.ID) {
|
||||||
|
if len(ids) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
idMap, err := e.selectNonExistentIDs(ctx, ids)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(idMap) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var failed bool
|
||||||
|
var prm shard.ContainerCountPrm
|
||||||
|
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
var drop []cid.ID
|
||||||
|
for id := range idMap {
|
||||||
|
prm.ContainerID = id
|
||||||
|
s, err := sh.ContainerCount(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Warn(logs.EngineFailedToGetContainerCounters, zap.Stringer("container_id", id), zap.Error(err))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if s.User > 0 || s.Logic > 0 || s.Phy > 0 {
|
||||||
|
drop = append(drop, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, id := range drop {
|
||||||
|
delete(idMap, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(idMap) == 0
|
||||||
|
})
|
||||||
|
|
||||||
|
if failed || len(idMap) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
for id := range idMap {
|
||||||
|
if err := sh.DeleteContainerSize(ctx, id); err != nil {
|
||||||
|
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
||||||
|
failed = true
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
if failed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for id := range idMap {
|
||||||
|
e.metrics.DeleteContainerCount(id.EncodeToString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) selectNonExistentIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) {
|
||||||
|
cs := e.containerSource.Load()
|
||||||
|
|
||||||
|
idMap := make(map[cid.ID]struct{})
|
||||||
|
for _, id := range ids {
|
||||||
|
isAvailable, err := cs.IsContainerAvailable(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Warn(logs.EngineFailedToCheckContainerAvailability, zap.Stringer("container_id", id), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if isAvailable {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idMap[id] = struct{}{}
|
||||||
|
}
|
||||||
|
return idMap, nil
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,8 @@ type MetricRegister interface {
|
||||||
SetMode(shardID string, mode mode.Mode)
|
SetMode(shardID string, mode mode.Mode)
|
||||||
|
|
||||||
AddToContainerSize(cnrID string, size int64)
|
AddToContainerSize(cnrID string, size int64)
|
||||||
|
DeleteContainerSize(cnrID string)
|
||||||
|
DeleteContainerCount(cnrID string)
|
||||||
AddToPayloadCounter(shardID string, size int64)
|
AddToPayloadCounter(shardID string, size int64)
|
||||||
IncErrorCounter(shardID string)
|
IncErrorCounter(shardID string)
|
||||||
ClearErrorCounter(shardID string)
|
ClearErrorCounter(shardID string)
|
||||||
|
|
|
@ -119,6 +119,8 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
|
||||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||||
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
|
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
|
||||||
|
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
|
||||||
|
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
if err := sh.UpdateID(ctx); err != nil {
|
if err := sh.UpdateID(ctx); err != nil {
|
||||||
|
|
|
@ -14,6 +14,8 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -118,7 +120,9 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
completed, err := db.containerCountersNextBatch(lastKey, &cc)
|
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
|
||||||
|
cc.Counts[id] = entity
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cc, err
|
return cc, err
|
||||||
}
|
}
|
||||||
|
@ -131,7 +135,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
||||||
return cc, nil
|
return cc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
|
func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) {
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -163,7 +167,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cc.Counts[cnrID] = ent
|
f(cnrID, ent)
|
||||||
|
|
||||||
counter++
|
counter++
|
||||||
if counter == batchSize {
|
if counter == batchSize {
|
||||||
|
@ -185,6 +189,43 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("ContainerCount", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCount")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ObjectCounters{}, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var result ObjectCounters
|
||||||
|
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
id.Encode(key)
|
||||||
|
v := b.Get(key)
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
result, err = parseContainerCounterValue(v)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return result, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||||
|
@ -239,7 +280,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
|
||||||
return b.Put(counterKey, newCounter)
|
return b.Put(counterKey, newCounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838
|
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -268,9 +309,6 @@ func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCo
|
||||||
entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
|
entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
|
||||||
entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
|
entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
|
||||||
entity.User = nextValue(entity.User, delta.User, inc)
|
entity.User = nextValue(entity.User, delta.User, inc)
|
||||||
if entity.IsZero() {
|
|
||||||
return b.Delete(key)
|
|
||||||
}
|
|
||||||
value := containerCounterValue(entity)
|
value := containerCounterValue(entity)
|
||||||
return b.Put(key, value)
|
return b.Put(key, value)
|
||||||
}
|
}
|
||||||
|
@ -480,3 +518,214 @@ func IsUserObject(obj *objectSDK.Object) bool {
|
||||||
(obj.SplitID() == nil ||
|
(obj.SplitID() == nil ||
|
||||||
(hasParentID && len(obj.Children()) == 0))
|
(hasParentID && len(obj.Children()) == 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ZeroSizeContainers returns containers with size = 0.
|
||||||
|
func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
var result []cid.ID
|
||||||
|
lastKey := make([]byte, cidSize)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) {
|
||||||
|
if size == 0 {
|
||||||
|
result = append(result, contID)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if completed {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
success = true
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) {
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return false, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
counter := 0
|
||||||
|
const batchSize = 1000
|
||||||
|
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(containerVolumeBucketName)
|
||||||
|
c := b.Cursor()
|
||||||
|
var key, value []byte
|
||||||
|
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
|
||||||
|
if bytes.Equal(lastKey, key) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
copy(lastKey, key)
|
||||||
|
|
||||||
|
size := parseContainerSize(value)
|
||||||
|
var id cid.ID
|
||||||
|
if err := id.Decode(key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f(id, size)
|
||||||
|
|
||||||
|
counter++
|
||||||
|
if counter == batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if counter < batchSize {
|
||||||
|
return ErrInterruptIterator
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("container_id", id),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(containerVolumeBucketName)
|
||||||
|
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
id.Encode(key)
|
||||||
|
return b.Delete(key)
|
||||||
|
})
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ZeroCountContainers returns containers with objects count = 0 in metabase.
|
||||||
|
func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("ZeroCountContainers", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroCountContainers")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []cid.ID
|
||||||
|
|
||||||
|
lastKey := make([]byte, cidSize)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
|
||||||
|
if entity.IsZero() {
|
||||||
|
result = append(result, id)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
if completed {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
success = true
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("DeleteContainerCount", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerCount",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("container_id", id),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
|
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
id.Encode(key)
|
||||||
|
return b.Delete(key)
|
||||||
|
})
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
res, err := db.Delete(context.Background(), prm)
|
res, err := db.Delete(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), res.AvailableObjectsRemoved())
|
require.Equal(t, uint64(1), res.LogicCount())
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -105,11 +105,7 @@ func TestCounters(t *testing.T) {
|
||||||
v.Phy--
|
v.Phy--
|
||||||
v.Logic--
|
v.Logic--
|
||||||
v.User--
|
v.User--
|
||||||
if v.IsZero() {
|
exp[cnrID] = v
|
||||||
delete(exp, cnrID)
|
|
||||||
} else {
|
|
||||||
exp[cnrID] = v
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
@ -161,7 +157,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
res, err := db.Inhume(context.Background(), prm)
|
res, err := db.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
|
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
|
||||||
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
|
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
|
@ -389,7 +385,7 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
|
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
|
require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
|
||||||
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
|
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
|
@ -423,16 +419,12 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
require.Zero(t, deleteRes.LogicCount())
|
||||||
require.Zero(t, deleteRes.UserObjectsRemoved())
|
require.Zero(t, deleteRes.UserCount())
|
||||||
|
|
||||||
if v, ok := exp[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
v.Phy--
|
v.Phy--
|
||||||
if v.IsZero() {
|
exp[oo[0].Container()] = v
|
||||||
delete(exp, oo[0].Container())
|
|
||||||
} else {
|
|
||||||
exp[oo[0].Container()] = v
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
oo = oo[1:]
|
oo = oo[1:]
|
||||||
|
@ -456,18 +448,14 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
require.Equal(t, uint64(1), deleteRes.LogicCount())
|
||||||
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved())
|
require.Equal(t, uint64(1), deleteRes.UserCount())
|
||||||
|
|
||||||
if v, ok := exp[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
v.Phy--
|
v.Phy--
|
||||||
v.Logic--
|
v.Logic--
|
||||||
v.User--
|
v.User--
|
||||||
if v.IsZero() {
|
exp[oo[0].Container()] = v
|
||||||
delete(exp, oo[0].Container())
|
|
||||||
} else {
|
|
||||||
exp[oo[0].Container()] = v
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
oo = oo[1:]
|
oo = oo[1:]
|
||||||
|
|
|
@ -27,22 +27,22 @@ type DeletePrm struct {
|
||||||
|
|
||||||
// DeleteRes groups the resulting values of Delete operation.
|
// DeleteRes groups the resulting values of Delete operation.
|
||||||
type DeleteRes struct {
|
type DeleteRes struct {
|
||||||
rawRemoved uint64
|
phyCount uint64
|
||||||
availableRemoved uint64
|
logicCount uint64
|
||||||
userRemoved uint64
|
userCount uint64
|
||||||
sizes []uint64
|
phySize uint64
|
||||||
availableSizes []uint64
|
logicSize uint64
|
||||||
removedByCnrID map[cid.ID]ObjectCounters
|
removedByCnrID map[cid.ID]ObjectCounters
|
||||||
}
|
}
|
||||||
|
|
||||||
// AvailableObjectsRemoved returns the number of removed available
|
// LogicCount returns the number of removed logic
|
||||||
// objects.
|
// objects.
|
||||||
func (d DeleteRes) AvailableObjectsRemoved() uint64 {
|
func (d DeleteRes) LogicCount() uint64 {
|
||||||
return d.availableRemoved
|
return d.logicCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d DeleteRes) UserObjectsRemoved() uint64 {
|
func (d DeleteRes) UserCount() uint64 {
|
||||||
return d.userRemoved
|
return d.userCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemovedByCnrID returns the number of removed objects by container ID.
|
// RemovedByCnrID returns the number of removed objects by container ID.
|
||||||
|
@ -50,19 +50,19 @@ func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
|
||||||
return d.removedByCnrID
|
return d.removedByCnrID
|
||||||
}
|
}
|
||||||
|
|
||||||
// RawObjectsRemoved returns the number of removed raw objects.
|
// PhyCount returns the number of removed physical objects.
|
||||||
func (d DeleteRes) RawObjectsRemoved() uint64 {
|
func (d DeleteRes) PhyCount() uint64 {
|
||||||
return d.rawRemoved
|
return d.phyCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemovedPhysicalObjectSizes returns the sizes of removed physical objects.
|
// PhySize returns the size of removed physical objects.
|
||||||
func (d DeleteRes) RemovedPhysicalObjectSizes() []uint64 {
|
func (d DeleteRes) PhySize() uint64 {
|
||||||
return d.sizes
|
return d.phySize
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemovedLogicalObjectSizes returns the sizes of removed logical objects.
|
// LogicSize returns the size of removed logical objects.
|
||||||
func (d DeleteRes) RemovedLogicalObjectSizes() []uint64 {
|
func (d DeleteRes) LogicSize() uint64 {
|
||||||
return d.availableSizes
|
return d.logicSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAddresses is a Delete option to set the addresses of the objects to delete.
|
// SetAddresses is a Delete option to set the addresses of the objects to delete.
|
||||||
|
@ -129,8 +129,6 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
// references of the split objects.
|
// references of the split objects.
|
||||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
||||||
res := DeleteRes{
|
res := DeleteRes{
|
||||||
sizes: make([]uint64, len(addrs)),
|
|
||||||
availableSizes: make([]uint64, len(addrs)),
|
|
||||||
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||||
}
|
}
|
||||||
refCounter := make(referenceCounter, len(addrs))
|
refCounter := make(referenceCounter, len(addrs))
|
||||||
|
@ -162,22 +160,22 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
||||||
if res.rawRemoved > 0 {
|
if res.phyCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
|
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.availableRemoved > 0 {
|
if res.logicCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
|
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.userRemoved > 0 {
|
if res.userCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, user, res.userRemoved, false)
|
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease user object counter: %w", err)
|
return fmt.Errorf("could not decrease user object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -190,7 +188,7 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
|
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
|
||||||
if r.Removed {
|
if r.Phy {
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
v.Phy++
|
v.Phy++
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
@ -200,11 +198,11 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.rawRemoved++
|
res.phyCount++
|
||||||
res.sizes[i] = r.Size
|
res.phySize += r.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Available {
|
if r.Logic {
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
v.Logic++
|
v.Logic++
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
@ -214,8 +212,8 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.availableRemoved++
|
res.logicCount++
|
||||||
res.availableSizes[i] = r.Size
|
res.logicSize += r.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.User {
|
if r.User {
|
||||||
|
@ -228,15 +226,15 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.userRemoved++
|
res.userCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteSingleResult struct {
|
type deleteSingleResult struct {
|
||||||
Removed bool
|
Phy bool
|
||||||
Available bool
|
Logic bool
|
||||||
User bool
|
User bool
|
||||||
Size uint64
|
Size uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete removes object indexes from the metabase. Counts the references
|
// delete removes object indexes from the metabase. Counts the references
|
||||||
|
@ -302,10 +300,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleteSingleResult{
|
return deleteSingleResult{
|
||||||
Removed: true,
|
Phy: true,
|
||||||
Available: removeAvailableObject,
|
Logic: removeAvailableObject,
|
||||||
User: isUserObject && removeAvailableObject,
|
User: isUserObject && removeAvailableObject,
|
||||||
Size: obj.PayloadSize(),
|
Size: obj.PayloadSize(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,17 +37,17 @@ type DeletionInfo struct {
|
||||||
|
|
||||||
// InhumeRes encapsulates results of Inhume operation.
|
// InhumeRes encapsulates results of Inhume operation.
|
||||||
type InhumeRes struct {
|
type InhumeRes struct {
|
||||||
deletedLockObj []oid.Address
|
deletedLockObj []oid.Address
|
||||||
availableInhumed uint64
|
logicInhumed uint64
|
||||||
userInhumed uint64
|
userInhumed uint64
|
||||||
inhumedByCnrID map[cid.ID]ObjectCounters
|
inhumedByCnrID map[cid.ID]ObjectCounters
|
||||||
deletionDetails []DeletionInfo
|
deletionDetails []DeletionInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// AvailableInhumed return number of available object
|
// LogicInhumed return number of logic object
|
||||||
// that have been inhumed.
|
// that have been inhumed.
|
||||||
func (i InhumeRes) AvailableInhumed() uint64 {
|
func (i InhumeRes) LogicInhumed() uint64 {
|
||||||
return i.availableInhumed
|
return i.logicInhumed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i InhumeRes) UserInhumed() uint64 {
|
func (i InhumeRes) UserInhumed() uint64 {
|
||||||
|
@ -87,7 +87,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, is
|
||||||
CID: containerID,
|
CID: containerID,
|
||||||
IsUser: isUser,
|
IsUser: isUser,
|
||||||
})
|
})
|
||||||
i.availableInhumed++
|
i.logicInhumed++
|
||||||
if isUser {
|
if isUser {
|
||||||
i.userInhumed++
|
i.userInhumed++
|
||||||
}
|
}
|
||||||
|
@ -265,7 +265,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil {
|
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
|
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
package shard
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ContainerSizePrm struct {
|
type ContainerSizePrm struct {
|
||||||
|
@ -39,3 +43,84 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||||
size: size,
|
size: size,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ContainerCountPrm struct {
|
||||||
|
ContainerID cid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
type ContainerCountRes struct {
|
||||||
|
Phy uint64
|
||||||
|
Logic uint64
|
||||||
|
User uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (ContainerCountRes, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerCount",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.Stringer("container_id", prm.ContainerID),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ContainerCountRes{}, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
||||||
|
if err != nil {
|
||||||
|
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ContainerCountRes{
|
||||||
|
Phy: counters.Phy,
|
||||||
|
Logic: counters.Logic,
|
||||||
|
User: counters.User,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.Stringer("container_id", id),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.metaBase.DeleteContainerSize(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerCount",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.Stringer("container_id", id),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.metaBase.DeleteContainerCount(ctx, id)
|
||||||
|
}
|
||||||
|
|
|
@ -155,6 +155,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
s.collectExpiredLocks,
|
s.collectExpiredLocks,
|
||||||
s.collectExpiredObjects,
|
s.collectExpiredObjects,
|
||||||
s.collectExpiredTombstones,
|
s.collectExpiredTombstones,
|
||||||
|
s.collectExpiredMetrics,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -141,16 +141,12 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
s.decObjectCounterBy(physical, res.PhyCount())
|
||||||
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
s.decObjectCounterBy(logical, res.LogicCount())
|
||||||
s.decObjectCounterBy(user, res.UserObjectsRemoved())
|
s.decObjectCounterBy(user, res.UserCount())
|
||||||
s.decContainerObjectCounter(res.RemovedByCnrID())
|
s.decContainerObjectCounter(res.RemovedByCnrID())
|
||||||
removedPayload := res.RemovedPhysicalObjectSizes()[0]
|
s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize()))
|
||||||
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
|
s.addToPayloadSize(-int64(res.PhySize()))
|
||||||
if logicalRemovedPayload > 0 {
|
|
||||||
s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload))
|
|
||||||
}
|
|
||||||
s.addToPayloadSize(-int64(removedPayload))
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -414,8 +415,8 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
|
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeRegular)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.LogicInhumed())
|
||||||
s.decObjectCounterBy(user, res.UserInhumed())
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
|
@ -629,8 +630,8 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
|
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.LogicInhumed())
|
||||||
s.decObjectCounterBy(user, res.UserInhumed())
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
|
@ -677,8 +678,8 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
|
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeLock)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.LogicInhumed())
|
||||||
s.decObjectCounterBy(user, res.UserInhumed())
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
|
@ -726,3 +727,40 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
|
||||||
func (s *Shard) NotificationChannel() chan<- Event {
|
func (s *Shard) NotificationChannel() chan<- Event {
|
||||||
return s.gc.eventChan
|
return s.gc.eventChan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
epoch := e.(newEpoch).epoch
|
||||||
|
|
||||||
|
s.log.Debug(logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
|
||||||
|
defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
|
s.collectExpiredContainerSizeMetrics(ctx, epoch)
|
||||||
|
s.collectExpiredContainerCountMetrics(ctx, epoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
|
||||||
|
ids, err := s.metaBase.ZeroSizeContainers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(ids) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.zeroSizeContainersCallback(ctx, ids)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
|
||||||
|
ids, err := s.metaBase.ZeroCountContainers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(ids) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.zeroCountContainersCallback(ctx, ids)
|
||||||
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
|
|
||||||
s.m.RUnlock()
|
s.m.RUnlock()
|
||||||
|
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.LogicInhumed())
|
||||||
s.decObjectCounterBy(user, res.UserInhumed())
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
|
|
|
@ -336,11 +336,7 @@ func TestCounters(t *testing.T) {
|
||||||
v.Logic--
|
v.Logic--
|
||||||
v.Phy--
|
v.Phy--
|
||||||
v.User--
|
v.User--
|
||||||
if v.IsZero() {
|
expected[cnr] = v
|
||||||
delete(expected, cnr)
|
|
||||||
} else {
|
|
||||||
expected[cnr] = v
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
|
|
|
@ -53,6 +53,9 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
|
||||||
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
|
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
|
||||||
type DeletedLockCallback func(context.Context, []oid.Address)
|
type DeletedLockCallback func(context.Context, []oid.Address)
|
||||||
|
|
||||||
|
// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers.
|
||||||
|
type EmptyContainersCallback func(context.Context, []cid.ID)
|
||||||
|
|
||||||
// MetricsWriter is an interface that must store shard's metrics.
|
// MetricsWriter is an interface that must store shard's metrics.
|
||||||
type MetricsWriter interface {
|
type MetricsWriter interface {
|
||||||
// SetObjectCounter must set object counter taking into account object type.
|
// SetObjectCounter must set object counter taking into account object type.
|
||||||
|
@ -118,6 +121,9 @@ type cfg struct {
|
||||||
|
|
||||||
deletedLockCallBack DeletedLockCallback
|
deletedLockCallBack DeletedLockCallback
|
||||||
|
|
||||||
|
zeroSizeContainersCallback EmptyContainersCallback
|
||||||
|
zeroCountContainersCallback EmptyContainersCallback
|
||||||
|
|
||||||
tsSource TombstoneSource
|
tsSource TombstoneSource
|
||||||
|
|
||||||
metricsWriter MetricsWriter
|
metricsWriter MetricsWriter
|
||||||
|
@ -129,11 +135,13 @@ type cfg struct {
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
return &cfg{
|
return &cfg{
|
||||||
rmBatchSize: 100,
|
rmBatchSize: 100,
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
gcCfg: defaultGCCfg(),
|
gcCfg: defaultGCCfg(),
|
||||||
reportErrorFunc: func(string, string, error) {},
|
reportErrorFunc: func(string, string, error) {},
|
||||||
rebuildLimiter: &noopRebuildLimiter{},
|
rebuildLimiter: &noopRebuildLimiter{},
|
||||||
|
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
||||||
|
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,6 +371,20 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithZeroSizeCallback returns option to set zero-size containers callback.
|
||||||
|
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.zeroSizeContainersCallback = cb
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithZeroCountCallback returns option to set zero-count containers callback.
|
||||||
|
func WithZeroCountCallback(cb EmptyContainersCallback) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.zeroCountContainersCallback = cb
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) fillInfo() {
|
func (s *Shard) fillInfo() {
|
||||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||||
|
|
|
@ -11,6 +11,8 @@ import (
|
||||||
type EngineMetrics interface {
|
type EngineMetrics interface {
|
||||||
AddMethodDuration(method string, d time.Duration)
|
AddMethodDuration(method string, d time.Duration)
|
||||||
AddToContainerSize(cnrID string, size int64)
|
AddToContainerSize(cnrID string, size int64)
|
||||||
|
DeleteContainerSize(cnrID string)
|
||||||
|
DeleteContainerCount(cnrID string)
|
||||||
IncErrorCounter(shardID string)
|
IncErrorCounter(shardID string)
|
||||||
ClearErrorCounter(shardID string)
|
ClearErrorCounter(shardID string)
|
||||||
DeleteShardMetrics(shardID string)
|
DeleteShardMetrics(shardID string)
|
||||||
|
@ -79,6 +81,14 @@ func (m *engineMetrics) AddToContainerSize(cnrID string, size int64) {
|
||||||
m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size))
|
m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) DeleteContainerSize(cnrID string) {
|
||||||
|
m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) DeleteContainerCount(cnrID string) {
|
||||||
|
m.contObjCounter.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
|
||||||
|
}
|
||||||
|
|
||||||
func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) {
|
func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) {
|
||||||
m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size))
|
m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue