blobstor: Refactor storage selection #695

Closed
dstepanov-yadro wants to merge 1 commit from dstepanov-yadro/frostfs-node:feat/blobovnicza_selector_refactor into master
13 changed files with 59 additions and 76 deletions

View file

@ -796,6 +796,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
SupportsStorageID: true,
})
case fstree.Type:
fstreeOpts := []fstree.Option{

View file

@ -13,8 +13,9 @@ import (
// SubStorage represents single storage component with some storage policy.
type SubStorage struct {
Storage common.Storage
Policy func(*objectSDK.Object, []byte) bool
Storage common.Storage
Policy func(*objectSDK.Object, []byte) bool
SupportsStorageID bool
}
// BlobStor represents FrostFS local BLOB storage.

View file

@ -27,6 +27,7 @@ func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *testst
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= smallSizeLimit
},
SupportsStorageID: true,
},
{
Storage: largeFileStorage,
@ -121,6 +122,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < smallSizeLimit
},
SupportsStorageID: true,
},
{
Storage: fstree.New(fstree.WithPath(dir)),

View file

@ -6,6 +6,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.uber.org/zap"
)
@ -77,3 +78,25 @@ func (b *BlobStor) Close() error {
}
return firstErr
}
func (b *BlobStor) selectStorages(storageID []byte) []common.Storage {
var res []common.Storage
if storageID == nil {
for idx := range b.storage {
res = append(res, b.storage[idx].Storage)
}
} else if len(storageID) > 0 {
for idx := range b.storage {
if b.storage[idx].SupportsStorageID {
res = append(res, b.storage[idx].Storage)
}
}
} else {
for idx := range b.storage {
if !b.storage[idx].SupportsStorageID {
res = append(res, b.storage[idx].Storage)
}
}
}
return res
}

View file

@ -6,8 +6,10 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -31,32 +33,16 @@ func (b *BlobStor) Delete(ctx context.Context, prm common.DeletePrm) (common.Del
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err := b.storage[i].Storage.Delete(ctx, prm)
if err == nil || !client.IsErrObjectNotFound(err) {
if err == nil {
success = true
logOp(b.log, deleteOp, prm.Address, b.storage[i].Storage.Type(), prm.StorageID)
}
return res, err
for _, storage := range b.selectStorages(prm.StorageID) {
res, err := storage.Delete(ctx, prm)
if err == nil || !client.IsErrObjectNotFound(err) {
if err == nil {
success = true
logOp(b.log, deleteOp, prm.Address, storage.Type(), prm.StorageID)
}
return res, err
}
}
var st common.Storage
if len(prm.StorageID) == 0 {
st = b.storage[len(b.storage)-1].Storage
} else {
st = b.storage[0].Storage
}
res, err := st.Delete(ctx, prm)
if err == nil {
success = true
logOp(b.log, deleteOp, prm.Address, st.Type(), prm.StorageID)
}
return res, err
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}

View file

@ -36,29 +36,9 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID != nil {
if len(prm.StorageID) == 0 {
res, err := b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
exists = err == nil && res.Exists
return res, err
}
res, err := b.storage[0].Storage.Exists(ctx, prm)
exists = err == nil && res.Exists
return res, err
}
// If there was an error during existence check below,
// it will be returned unless object was found in blobovnicza.
// Otherwise, it is logged and the latest error is returned.
// FSTree | Blobovnicza | Behaviour
// found | (not tried) | return true, nil
// not found | any result | return the result
// error | found | log the error, return true, nil
// error | not found | return the error
// error | error | log the first error, return the second
var errors []error
for i := range b.storage {
res, err := b.storage[i].Storage.Exists(ctx, prm)
for _, storage := range b.selectStorages(prm.StorageID) {
res, err := storage.Exists(ctx, prm)
if err == nil && res.Exists {
exists = true
return res, nil

View file

@ -36,20 +36,12 @@ func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (res common.GetRe
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err = b.storage[i].Storage.Get(ctx, prm)
if err == nil || !client.IsErrObjectNotFound(err) {
return res, err
}
for _, storage := range b.selectStorages(prm.StorageID) {
res, err = storage.Get(ctx, prm)
if err == nil || !client.IsErrObjectNotFound(err) {
return res, err
}
}
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if len(prm.StorageID) == 0 {
res, err = b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
} else {
res, err = b.storage[0].Storage.Get(ctx, prm)
}
return res, err
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}

View file

@ -38,20 +38,12 @@ func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (res co
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err = b.storage[i].Storage.GetRange(ctx, prm)
if err == nil || !client.IsErrObjectNotFound(err) {
return res, err
}
for _, storage := range b.selectStorages(prm.StorageID) {
res, err = storage.GetRange(ctx, prm)
if err == nil || !client.IsErrObjectNotFound(err) {
return res, err
}
}
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if len(prm.StorageID) == 0 {
res, err = b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
} else {
res, err = b.storage[0].Storage.GetRange(ctx, prm)
}
return res, err
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}

View file

@ -148,6 +148,7 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < smallSize
},
SupportsStorageID: true,
},
{
Storage: fstree.New(
@ -176,6 +177,7 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < smallSize
},
SupportsStorageID: true,
},
{
Storage: largeFileStorage,

View file

@ -43,6 +43,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
SupportsStorageID: true,
},
{
Storage: fstree.New(

View file

@ -41,6 +41,7 @@ func TestShard_Lock(t *testing.T) {
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
SupportsStorageID: true,
},
{
Storage: fstree.New(

View file

@ -89,6 +89,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= smallObjectSize
},
SupportsStorageID: true,
},
{
Storage: fstree.New(

View file

@ -79,6 +79,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= 1<<20
},
SupportsStorageID: true,
},
{
Storage: fstree.New(