[#9999] shard: Fix metabase usage after db engine change
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
1b0f10d912
commit
c8ff0e840a
7 changed files with 27 additions and 28 deletions
|
@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 {
|
|||
return r.size
|
||||
}
|
||||
|
||||
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||
func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
@ -34,7 +34,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
|||
return ContainerSizeRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||
size, err := s.metaBase.ContainerSize(ctx, prm.cnr)
|
||||
if err != nil {
|
||||
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err)
|
||||
}
|
||||
|
|
|
@ -194,7 +194,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
}
|
||||
}()
|
||||
|
||||
err := s.metaBase.Reset()
|
||||
err := s.metaBase.Reset(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not reset metabase: %w", err)
|
||||
}
|
||||
|
@ -248,7 +248,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
||||
}
|
||||
|
||||
err = s.metaBase.SyncCounters()
|
||||
err = s.metaBase.SyncCounters(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counters: %w", err)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package shard
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -28,7 +27,6 @@ import (
|
|||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type objAddr struct {
|
||||
|
@ -37,6 +35,7 @@ type objAddr struct {
|
|||
}
|
||||
|
||||
func TestShardOpen(t *testing.T) {
|
||||
t.Skip("not implemented for pebble")
|
||||
t.Parallel()
|
||||
|
||||
dir := t.TempDir()
|
||||
|
@ -49,13 +48,6 @@ func TestShardOpen(t *testing.T) {
|
|||
))
|
||||
|
||||
var allowedMode atomic.Int64
|
||||
openFileMetabase := func(p string, f int, perm fs.FileMode) (*os.File, error) {
|
||||
const modeMask = os.O_RDONLY | os.O_RDWR | os.O_WRONLY
|
||||
if int64(f&modeMask) == allowedMode.Load() {
|
||||
return os.OpenFile(p, f, perm)
|
||||
}
|
||||
return nil, fs.ErrPermission
|
||||
}
|
||||
|
||||
wcOpts := []writecache.Option{
|
||||
writecache.WithPath(filepath.Join(dir, "wc")),
|
||||
|
@ -72,7 +64,6 @@ func TestShardOpen(t *testing.T) {
|
|||
WithMetaBaseOptions(
|
||||
meta.WithPath(metaPath),
|
||||
meta.WithEpochState(epochState{}),
|
||||
meta.WithBoltDBOptions(&bbolt.Options{OpenFile: openFileMetabase}),
|
||||
),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
|
@ -352,7 +343,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
checkTombMembers(true)
|
||||
checkLocked(t, cnrLocked, locked)
|
||||
|
||||
c, err := sh.metaBase.ObjectCounters()
|
||||
c, err := sh.metaBase.ObjectCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
phyBefore := c.Phy
|
||||
|
@ -388,7 +379,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
err = sh.refillMetabase(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err = sh.metaBase.ObjectCounters()
|
||||
c, err = sh.metaBase.ObjectCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, phyBefore, c.Phy)
|
||||
|
|
|
@ -23,7 +23,7 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
|
|||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
cc, err := s.metaBase.ObjectCounters()
|
||||
cc, err := s.metaBase.ObjectCounters(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -135,18 +136,25 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error
|
|||
|
||||
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {
|
||||
var delPrm meta.DeletePrm
|
||||
delPrm.SetAddresses(addr)
|
||||
delPrm.Address = addr
|
||||
|
||||
res, err := s.metaBase.Delete(ctx, delPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.decObjectCounterBy(physical, res.PhyCount())
|
||||
s.decObjectCounterBy(logical, res.LogicCount())
|
||||
s.decObjectCounterBy(user, res.UserCount())
|
||||
s.decContainerObjectCounter(res.RemovedByCnrID())
|
||||
s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize()))
|
||||
s.addToPayloadSize(-int64(res.PhySize()))
|
||||
s.decObjectCounterBy(physical, res.PhyCount)
|
||||
s.decObjectCounterBy(logical, res.LogicCount)
|
||||
s.decObjectCounterBy(user, res.UserCount)
|
||||
containerCounter := map[cid.ID]meta.ObjectCounters{
|
||||
addr.Container(): {
|
||||
Logic: res.LogicCount,
|
||||
Phy: res.PhyCount,
|
||||
User: res.UserCount,
|
||||
},
|
||||
}
|
||||
s.decContainerObjectCounter(containerCounter)
|
||||
s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize))
|
||||
s.addToPayloadSize(-int64(res.PhySize))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -678,7 +678,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
|||
if s.GetMode().NoMetabase() {
|
||||
return
|
||||
}
|
||||
unlocked, err := s.metaBase.FreeLockedBy(lockers)
|
||||
unlocked, err := s.metaBase.FreeLockedBy(ctx, lockers)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.ShardFailureToUnlockObjects,
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -735,7 +735,7 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
|
|||
return
|
||||
}
|
||||
|
||||
_, err := s.metaBase.FreeLockedBy(lockers)
|
||||
_, err := s.metaBase.FreeLockedBy(context.TODO(), lockers)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.ShardFailureToUnlockObjects,
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -439,7 +439,7 @@ func (s *Shard) updateMetrics(ctx context.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
cc, err := s.metaBase.ObjectCounters()
|
||||
cc, err := s.metaBase.ObjectCounters(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.ShardMetaObjectCounterRead,
|
||||
zap.Error(err),
|
||||
|
@ -461,7 +461,7 @@ func (s *Shard) updateMetrics(ctx context.Context) {
|
|||
var totalPayload uint64
|
||||
|
||||
for i := range cnrList {
|
||||
size, err := s.metaBase.ContainerSize(cnrList[i])
|
||||
size, err := s.metaBase.ContainerSize(ctx, cnrList[i])
|
||||
if err != nil {
|
||||
s.log.Warn(logs.ShardMetaCantReadContainerSize,
|
||||
zap.String("cid", cnrList[i].EncodeToString()),
|
||||
|
|
Loading…
Reference in a new issue