[#1273] writecache: Flush writecache concurrently
Some checks failed
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m1s
Build / Build Components (1.21) (pull_request) Successful in 3m0s
Tests and linters / gopls check (pull_request) Successful in 4m17s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m4s
Tests and linters / Tests with -race (pull_request) Successful in 7m26s
Tests and linters / Lint (pull_request) Failing after 12m19s
Tests and linters / Tests (1.21) (pull_request) Failing after 12m15s
Tests and linters / Staticcheck (pull_request) Failing after 12m9s
Vulncheck / Vulncheck (pull_request) Failing after 11m52s
Build / Build Components (1.22) (pull_request) Failing after 11m39s
DCO action / DCO (pull_request) Failing after 11m35s
Some checks failed
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m1s
Build / Build Components (1.21) (pull_request) Successful in 3m0s
Tests and linters / gopls check (pull_request) Successful in 4m17s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m4s
Tests and linters / Tests with -race (pull_request) Successful in 7m26s
Tests and linters / Lint (pull_request) Failing after 12m19s
Tests and linters / Tests (1.21) (pull_request) Failing after 12m15s
Tests and linters / Staticcheck (pull_request) Failing after 12m9s
Vulncheck / Vulncheck (pull_request) Failing after 11m52s
Build / Build Components (1.22) (pull_request) Failing after 11m39s
DCO action / DCO (pull_request) Failing after 11m35s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
e61f9ac796
commit
81df490709
13 changed files with 138 additions and 87 deletions
|
@ -33,13 +33,11 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
|
||||||
db := openMeta(cmd)
|
db := openMeta(cmd)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
storageID := meta.StorageIDPrm{}
|
storageID := meta.StorageIDPrm{Address: addr}
|
||||||
storageID.SetAddress(addr)
|
|
||||||
|
|
||||||
resStorageID, err := db.StorageID(cmd.Context(), storageID)
|
resStorageID, err := db.StorageID(cmd.Context(), storageID)
|
||||||
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
|
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
|
||||||
|
|
||||||
if id := resStorageID.StorageID(); id != nil {
|
if id := resStorageID.StorageID; id != nil {
|
||||||
cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
|
cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
|
||||||
} else {
|
} else {
|
||||||
cmd.Printf("Object does not contain storageID\n\n")
|
cmd.Printf("Object does not contain storageID\n\n")
|
||||||
|
|
|
@ -468,6 +468,7 @@ const (
|
||||||
FSTreeCantUnmarshalObject = "can't unmarshal an object"
|
FSTreeCantUnmarshalObject = "can't unmarshal an object"
|
||||||
FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor"
|
FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor"
|
||||||
FSTreeCantUpdateID = "can't update object storage ID"
|
FSTreeCantUpdateID = "can't update object storage ID"
|
||||||
|
FSTreeCantGetID = "can't get object storage ID"
|
||||||
FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB"
|
FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB"
|
||||||
PutSingleRedirectFailure = "failed to redirect PutSingle request"
|
PutSingleRedirectFailure = "failed to redirect PutSingle request"
|
||||||
StorageIDRetrievalFailure = "can't get storage ID from metabase"
|
StorageIDRetrievalFailure = "can't get storage ID from metabase"
|
||||||
|
|
|
@ -15,22 +15,12 @@ import (
|
||||||
|
|
||||||
// StorageIDPrm groups the parameters of StorageID operation.
|
// StorageIDPrm groups the parameters of StorageID operation.
|
||||||
type StorageIDPrm struct {
|
type StorageIDPrm struct {
|
||||||
addr oid.Address
|
Address oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// StorageIDRes groups the resulting values of StorageID operation.
|
// StorageIDRes groups the resulting values of StorageID operation.
|
||||||
type StorageIDRes struct {
|
type StorageIDRes struct {
|
||||||
id []byte
|
StorageID []byte
|
||||||
}
|
|
||||||
|
|
||||||
// SetAddress is a StorageID option to set the object address to check.
|
|
||||||
func (p *StorageIDPrm) SetAddress(addr oid.Address) {
|
|
||||||
p.addr = addr
|
|
||||||
}
|
|
||||||
|
|
||||||
// StorageID returns storage ID.
|
|
||||||
func (r StorageIDRes) StorageID() []byte {
|
|
||||||
return r.id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StorageID returns storage descriptor for objects from the blobstor.
|
// StorageID returns storage descriptor for objects from the blobstor.
|
||||||
|
@ -46,7 +36,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -58,7 +48,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.id, err = db.storageID(tx, prm.addr)
|
res.StorageID, err = db.storageID(tx, prm.Address)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -83,23 +73,13 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
|
||||||
|
|
||||||
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
||||||
type UpdateStorageIDPrm struct {
|
type UpdateStorageIDPrm struct {
|
||||||
addr oid.Address
|
Address oid.Address
|
||||||
id []byte
|
StorageID []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateStorageIDRes groups the resulting values of UpdateStorageID operation.
|
// UpdateStorageIDRes groups the resulting values of UpdateStorageID operation.
|
||||||
type UpdateStorageIDRes struct{}
|
type UpdateStorageIDRes struct{}
|
||||||
|
|
||||||
// SetAddress is an UpdateStorageID option to set the object address to check.
|
|
||||||
func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) {
|
|
||||||
p.addr = addr
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetStorageID is an UpdateStorageID option to set the storage ID.
|
|
||||||
func (p *UpdateStorageIDPrm) SetStorageID(id []byte) {
|
|
||||||
p.id = id
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateStorageID updates storage descriptor for objects from the blobstor.
|
// UpdateStorageID updates storage descriptor for objects from the blobstor.
|
||||||
func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
|
func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
|
||||||
var (
|
var (
|
||||||
|
@ -112,8 +92,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID",
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
attribute.String("storage_id", string(prm.id)),
|
attribute.String("storage_id", string(prm.StorageID)),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -127,7 +107,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
return setStorageID(tx, prm.addr, prm.id, true)
|
return setStorageID(tx, prm.Address, prm.StorageID, true)
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
|
|
|
@ -102,18 +102,13 @@ func TestPutWritecacheDataRace(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
|
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
|
||||||
var sidPrm meta.UpdateStorageIDPrm
|
sidPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: id}
|
||||||
sidPrm.SetAddress(addr)
|
|
||||||
sidPrm.SetStorageID(id)
|
|
||||||
|
|
||||||
_, err := db.UpdateStorageID(context.Background(), sidPrm)
|
_, err := db.UpdateStorageID(context.Background(), sidPrm)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) {
|
func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) {
|
||||||
var sidPrm meta.StorageIDPrm
|
sidPrm := meta.StorageIDPrm{Address: addr}
|
||||||
sidPrm.SetAddress(addr)
|
|
||||||
|
|
||||||
r, err := db.StorageID(context.Background(), sidPrm)
|
r, err := db.StorageID(context.Background(), sidPrm)
|
||||||
return r.StorageID(), err
|
return r.StorageID, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,9 +105,7 @@ func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
|
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
|
||||||
var sPrm meta.StorageIDPrm
|
sPrm := meta.StorageIDPrm{Address: addr}
|
||||||
sPrm.SetAddress(addr)
|
|
||||||
|
|
||||||
res, err := s.metaBase.StorageID(ctx, sPrm)
|
res, err := s.metaBase.StorageID(ctx, sPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug(logs.StorageIDRetrievalFailure,
|
s.log.Debug(logs.StorageIDRetrievalFailure,
|
||||||
|
@ -116,7 +114,7 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
storageID := res.StorageID()
|
storageID := res.StorageID
|
||||||
if storageID == nil {
|
if storageID == nil {
|
||||||
// if storageID is nil it means:
|
// if storageID is nil it means:
|
||||||
// 1. there is no such object
|
// 1. there is no such object
|
||||||
|
|
|
@ -109,15 +109,14 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
require.True(t, client.IsErrObjectNotFound(err), "invalid error type")
|
require.True(t, client.IsErrObjectNotFound(err), "invalid error type")
|
||||||
|
|
||||||
// storageID
|
// storageID
|
||||||
var metaStIDPrm meta.StorageIDPrm
|
metaStIDPrm := meta.StorageIDPrm{Address: addr}
|
||||||
metaStIDPrm.SetAddress(addr)
|
|
||||||
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
|
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
|
||||||
require.NoError(t, err, "failed to get storage ID")
|
require.NoError(t, err, "failed to get storage ID")
|
||||||
|
|
||||||
// check existence in blobstore
|
// check existence in blobstore
|
||||||
var bsExisted common.ExistsPrm
|
var bsExisted common.ExistsPrm
|
||||||
bsExisted.Address = addr
|
bsExisted.Address = addr
|
||||||
bsExisted.StorageID = storageID.StorageID()
|
bsExisted.StorageID = storageID.StorageID
|
||||||
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
|
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
|
||||||
require.NoError(t, err, "failed to check blobstore existence")
|
require.NoError(t, err, "failed to check blobstore existence")
|
||||||
require.True(t, exRes.Exists, "invalid blobstore existence result")
|
require.True(t, exRes.Exists, "invalid blobstore existence result")
|
||||||
|
@ -125,7 +124,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
// drop from blobstor
|
// drop from blobstor
|
||||||
var bsDeletePrm common.DeletePrm
|
var bsDeletePrm common.DeletePrm
|
||||||
bsDeletePrm.Address = addr
|
bsDeletePrm.Address = addr
|
||||||
bsDeletePrm.StorageID = storageID.StorageID()
|
bsDeletePrm.StorageID = storageID.StorageID
|
||||||
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
|
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
|
||||||
require.NoError(t, err, "failed to delete from blobstore")
|
require.NoError(t, err, "failed to delete from blobstore")
|
||||||
|
|
||||||
|
|
|
@ -160,15 +160,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
|
||||||
return res, false, err
|
return res, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var mPrm meta.StorageIDPrm
|
mPrm := meta.StorageIDPrm{Address: addr}
|
||||||
mPrm.SetAddress(addr)
|
|
||||||
|
|
||||||
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
storageID := mExRes.StorageID()
|
storageID := mExRes.StorageID
|
||||||
if storageID == nil {
|
if storageID == nil {
|
||||||
// `nil` storageID returned without any error
|
// `nil` storageID returned without any error
|
||||||
// means that object is big, `cb` expects an
|
// means that object is big, `cb` expects an
|
||||||
|
|
|
@ -90,9 +90,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
|
||||||
return errMBIsNotAvailable
|
return errMBIsNotAvailable
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm meta.UpdateStorageIDPrm
|
prm := meta.UpdateStorageIDPrm{Address: addr, StorageID: storageID}
|
||||||
prm.SetAddress(addr)
|
|
||||||
prm.SetStorageID(storageID)
|
|
||||||
_, err := u.mb.UpdateStorageID(ctx, prm)
|
_, err := u.mb.UpdateStorageID(ctx, prm)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package benchmark
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -10,6 +11,7 @@ import (
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -80,12 +82,30 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
|
||||||
require.NoError(b, cache.Init(), "initializing")
|
require.NoError(b, cache.Init(), "initializing")
|
||||||
}
|
}
|
||||||
|
|
||||||
type testMetabase struct{}
|
type testMetabase struct {
|
||||||
|
storageIDs map[oid.Address][]byte
|
||||||
|
guard *sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
|
func (t *testMetabase) UpdateStorageID(_ context.Context, prm meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
|
||||||
|
t.guard.Lock()
|
||||||
|
defer t.guard.Unlock()
|
||||||
|
t.storageIDs[prm.Address] = prm.StorageID
|
||||||
return meta.UpdateStorageIDRes{}, nil
|
return meta.UpdateStorageIDRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *testMetabase) StorageID(_ context.Context, prm meta.StorageIDPrm) (meta.StorageIDRes, error) {
|
||||||
|
t.guard.RLock()
|
||||||
|
defer t.guard.RUnlock()
|
||||||
|
|
||||||
|
if id, found := t.storageIDs[prm.Address]; found {
|
||||||
|
return meta.StorageIDRes{
|
||||||
|
StorageID: id,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return meta.StorageIDRes{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newCache(b *testing.B) writecache.Cache {
|
func newCache(b *testing.B) writecache.Cache {
|
||||||
bs := teststore.New(
|
bs := teststore.New(
|
||||||
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
|
teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }),
|
||||||
|
@ -93,7 +113,7 @@ func newCache(b *testing.B) writecache.Cache {
|
||||||
return writecache.New(
|
return writecache.New(
|
||||||
writecache.WithPath(b.TempDir()),
|
writecache.WithPath(b.TempDir()),
|
||||||
writecache.WithBlobstor(bs),
|
writecache.WithBlobstor(bs),
|
||||||
writecache.WithMetabase(testMetabase{}),
|
writecache.WithMetabase(&testMetabase{storageIDs: make(map[oid.Address][]byte), guard: &sync.RWMutex{}}),
|
||||||
writecache.WithMaxCacheSize(256<<30),
|
writecache.WithMaxCacheSize(256<<30),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,6 +28,9 @@ type cache struct {
|
||||||
// whether object should be compressed.
|
// whether object should be compressed.
|
||||||
compressFlags map[string]struct{}
|
compressFlags map[string]struct{}
|
||||||
|
|
||||||
|
flushCh chan objectInfo
|
||||||
|
flushingGuard *utilSync.KeyLocker[oid.Address]
|
||||||
|
|
||||||
// cancel is cancel function, protected by modeMtx in Close.
|
// cancel is cancel function, protected by modeMtx in Close.
|
||||||
cancel atomic.Value
|
cancel atomic.Value
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
|
@ -48,7 +53,8 @@ var dummyCanceler context.CancelFunc = func() {}
|
||||||
func New(opts ...Option) Cache {
|
func New(opts ...Option) Cache {
|
||||||
c := &cache{
|
c := &cache{
|
||||||
mode: mode.Disabled,
|
mode: mode.Disabled,
|
||||||
|
flushCh: make(chan objectInfo),
|
||||||
|
flushingGuard: utilSync.NewKeyLocker[oid.Address](),
|
||||||
compressFlags: make(map[string]struct{}),
|
compressFlags: make(map[string]struct{}),
|
||||||
options: options{
|
options: options{
|
||||||
log: &logger.Logger{Logger: zap.NewNop()},
|
log: &logger.Logger{Logger: zap.NewNop()},
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
@ -14,6 +13,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -32,33 +32,78 @@ func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go func() {
|
go c.workerFlush(ctx)
|
||||||
c.workerFlushBig(ctx)
|
|
||||||
c.wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) workerFlushBig(ctx context.Context) {
|
c.wg.Add(1)
|
||||||
|
go c.workerSelect(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) workerSelect(ctx context.Context) {
|
||||||
|
defer c.wg.Done()
|
||||||
tick := time.NewTicker(defaultFlushInterval)
|
tick := time.NewTicker(defaultFlushInterval)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
|
var prm common.IteratePrm
|
||||||
|
prm.IgnoreErrors = true
|
||||||
|
prm.Handler = func(ie common.IterationElement) error {
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
if c.readOnly() || c.noMetabase() {
|
defer c.modeMtx.RUnlock()
|
||||||
c.modeMtx.RUnlock()
|
if c.readOnly() {
|
||||||
break
|
return ErrReadOnly
|
||||||
|
}
|
||||||
|
if c.noMetabase() {
|
||||||
|
return ErrDegraded
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = c.flushFSTree(ctx, true)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
c.modeMtx.RUnlock()
|
return ctx.Err()
|
||||||
|
case c.flushCh <- objectInfo{
|
||||||
|
data: ie.ObjectData,
|
||||||
|
address: ie.Address,
|
||||||
|
}:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, _ = c.fsTree.Iterate(ctx, prm)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cache) workerFlush(ctx context.Context) {
|
||||||
|
defer c.wg.Done()
|
||||||
|
|
||||||
|
var objInfo objectInfo
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case objInfo = <-c.flushCh:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj objectSDK.Object
|
||||||
|
err := obj.Unmarshal(objInfo.data)
|
||||||
|
if err != nil {
|
||||||
|
c.reportFlushError(logs.FSTreeCantUnmarshalObject, objInfo.address.EncodeToString(), metaerr.Wrap(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.flushObject(ctx, objInfo.address, &obj, objInfo.data)
|
||||||
|
if err != nil {
|
||||||
|
// Error is handled in flushObject.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.deleteFromDisk(ctx, objInfo.address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
if c.reportError != nil {
|
if c.reportError != nil {
|
||||||
c.reportError(msg, err)
|
c.reportError(msg, err)
|
||||||
|
@ -85,7 +130,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree)
|
err = c.flushObject(ctx, e.Address, &obj, e.ObjectData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
|
@ -102,15 +147,25 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushObject is used to write object directly to the main storage.
|
// flushObject is used to write object directly to the main storage.
|
||||||
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error {
|
func (c *cache) flushObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object, data []byte) error {
|
||||||
var err error
|
c.flushingGuard.Lock(addr)
|
||||||
|
defer c.flushingGuard.Unlock(addr)
|
||||||
|
|
||||||
|
stPrm := meta.StorageIDPrm{Address: addr}
|
||||||
|
stRes, err := c.metabase.StorageID(ctx, stPrm)
|
||||||
|
if err != nil {
|
||||||
|
c.reportFlushError(logs.FSTreeCantGetID, addr.EncodeToString(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if stRes.StorageID != nil {
|
||||||
|
// already flushed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
c.metrics.Flush(err == nil, st)
|
c.metrics.Flush(err == nil, StorageTypeFSTree)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
addr := objectCore.AddressOf(obj)
|
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Object = obj
|
prm.Object = obj
|
||||||
prm.RawData = data
|
prm.RawData = data
|
||||||
|
@ -125,9 +180,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var updPrm meta.UpdateStorageIDPrm
|
updPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: res.StorageID}
|
||||||
updPrm.SetAddress(addr)
|
|
||||||
updPrm.SetStorageID(res.StorageID)
|
|
||||||
|
|
||||||
_, err = c.metabase.UpdateStorageID(ctx, updPrm)
|
_, err = c.metabase.UpdateStorageID(ctx, updPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -169,3 +222,8 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
|
||||||
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return c.flushFSTree(ctx, ignoreErrors)
|
return c.flushFSTree(ctx, ignoreErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type objectInfo struct {
|
||||||
|
data []byte
|
||||||
|
address oid.Address
|
||||||
|
}
|
||||||
|
|
|
@ -226,15 +226,13 @@ func putObjects(t *testing.T, c Cache) []objectPair {
|
||||||
|
|
||||||
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
var mPrm meta.StorageIDPrm
|
mPrm := meta.StorageIDPrm{Address: objects[i].addr}
|
||||||
mPrm.SetAddress(objects[i].addr)
|
|
||||||
|
|
||||||
mRes, err := mb.StorageID(context.Background(), mPrm)
|
mRes, err := mb.StorageID(context.Background(), mPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var prm common.GetPrm
|
var prm common.GetPrm
|
||||||
prm.Address = objects[i].addr
|
prm.Address = objects[i].addr
|
||||||
prm.StorageID = mRes.StorageID()
|
prm.StorageID = mRes.StorageID
|
||||||
|
|
||||||
res, err := bs.Get(context.Background(), prm)
|
res, err := bs.Get(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -55,6 +55,7 @@ type MainStorage interface {
|
||||||
// Metabase is the interface of the metabase used by Cache implementations.
|
// Metabase is the interface of the metabase used by Cache implementations.
|
||||||
type Metabase interface {
|
type Metabase interface {
|
||||||
UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
|
UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
|
||||||
|
StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
Loading…
Reference in a new issue