blobstor: Refactor storage selection #695
13 changed files with 59 additions and 76 deletions
|
@ -796,6 +796,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
})
|
||||
case fstree.Type:
|
||||
fstreeOpts := []fstree.Option{
|
||||
|
|
|
@ -13,8 +13,9 @@ import (
|
|||
|
||||
// SubStorage represents single storage component with some storage policy.
|
||||
type SubStorage struct {
|
||||
Storage common.Storage
|
||||
Policy func(*objectSDK.Object, []byte) bool
|
||||
Storage common.Storage
|
||||
Policy func(*objectSDK.Object, []byte) bool
|
||||
SupportsStorageID bool
|
||||
}
|
||||
|
||||
// BlobStor represents FrostFS local BLOB storage.
|
||||
|
|
|
@ -27,6 +27,7 @@ func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *testst
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) <= smallSizeLimit
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: largeFileStorage,
|
||||
|
@ -121,6 +122,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < smallSizeLimit
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(fstree.WithPath(dir)),
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -77,3 +78,25 @@ func (b *BlobStor) Close() error {
|
|||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (b *BlobStor) selectStorages(storageID []byte) []common.Storage {
|
||||
var res []common.Storage
|
||||
if storageID == nil {
|
||||
for idx := range b.storage {
|
||||
res = append(res, b.storage[idx].Storage)
|
||||
}
|
||||
} else if len(storageID) > 0 {
|
||||
for idx := range b.storage {
|
||||
if b.storage[idx].SupportsStorageID {
|
||||
res = append(res, b.storage[idx].Storage)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for idx := range b.storage {
|
||||
if !b.storage[idx].SupportsStorageID {
|
||||
res = append(res, b.storage[idx].Storage)
|
||||
}
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -6,8 +6,10 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -31,32 +33,16 @@ func (b *BlobStor) Delete(ctx context.Context, prm common.DeletePrm) (common.Del
|
|||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
if prm.StorageID == nil {
|
||||
for i := range b.storage {
|
||||
res, err := b.storage[i].Storage.Delete(ctx, prm)
|
||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||
if err == nil {
|
||||
success = true
|
||||
logOp(b.log, deleteOp, prm.Address, b.storage[i].Storage.Type(), prm.StorageID)
|
||||
}
|
||||
return res, err
|
||||
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||
res, err := storage.Delete(ctx, prm)
|
||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||
if err == nil {
|
||||
success = true
|
||||
logOp(b.log, deleteOp, prm.Address, storage.Type(), prm.StorageID)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
}
|
||||
|
||||
var st common.Storage
|
||||
|
||||
if len(prm.StorageID) == 0 {
|
||||
st = b.storage[len(b.storage)-1].Storage
|
||||
} else {
|
||||
st = b.storage[0].Storage
|
||||
}
|
||||
|
||||
res, err := st.Delete(ctx, prm)
|
||||
if err == nil {
|
||||
success = true
|
||||
logOp(b.log, deleteOp, prm.Address, st.Type(), prm.StorageID)
|
||||
}
|
||||
|
||||
return res, err
|
||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
|
|
@ -36,29 +36,9 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
|||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
if prm.StorageID != nil {
|
||||
if len(prm.StorageID) == 0 {
|
||||
res, err := b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
|
||||
exists = err == nil && res.Exists
|
||||
return res, err
|
||||
}
|
||||
res, err := b.storage[0].Storage.Exists(ctx, prm)
|
||||
exists = err == nil && res.Exists
|
||||
return res, err
|
||||
}
|
||||
|
||||
// If there was an error during existence check below,
|
||||
// it will be returned unless object was found in blobovnicza.
|
||||
// Otherwise, it is logged and the latest error is returned.
|
||||
// FSTree | Blobovnicza | Behaviour
|
||||
// found | (not tried) | return true, nil
|
||||
// not found | any result | return the result
|
||||
// error | found | log the error, return true, nil
|
||||
// error | not found | return the error
|
||||
// error | error | log the first error, return the second
|
||||
var errors []error
|
||||
for i := range b.storage {
|
||||
res, err := b.storage[i].Storage.Exists(ctx, prm)
|
||||
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||
res, err := storage.Exists(ctx, prm)
|
||||
if err == nil && res.Exists {
|
||||
exists = true
|
||||
return res, nil
|
||||
|
|
|
@ -36,20 +36,12 @@ func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (res common.GetRe
|
|||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
if prm.StorageID == nil {
|
||||
for i := range b.storage {
|
||||
res, err = b.storage[i].Storage.Get(ctx, prm)
|
||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||
return res, err
|
||||
}
|
||||
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||
res, err = storage.Get(ctx, prm)
|
||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||
return res, err
|
||||
}
|
||||
}
|
||||
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if len(prm.StorageID) == 0 {
|
||||
res, err = b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
|
||||
} else {
|
||||
res, err = b.storage[0].Storage.Get(ctx, prm)
|
||||
}
|
||||
return res, err
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
|
|
@ -38,20 +38,12 @@ func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (res co
|
|||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
if prm.StorageID == nil {
|
||||
for i := range b.storage {
|
||||
res, err = b.storage[i].Storage.GetRange(ctx, prm)
|
||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||
return res, err
|
||||
}
|
||||
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||
res, err = storage.GetRange(ctx, prm)
|
||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||
return res, err
|
||||
}
|
||||
}
|
||||
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
if len(prm.StorageID) == 0 {
|
||||
res, err = b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
|
||||
} else {
|
||||
res, err = b.storage[0].Storage.GetRange(ctx, prm)
|
||||
}
|
||||
return res, err
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
|
|
@ -148,6 +148,7 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < smallSize
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(
|
||||
|
@ -176,6 +177,7 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < smallSize
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: largeFileStorage,
|
||||
|
|
|
@ -43,6 +43,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return len(data) <= 1<<20
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(
|
||||
|
|
|
@ -41,6 +41,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return len(data) <= 1<<20
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(
|
||||
|
|
|
@ -89,6 +89,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return len(data) <= smallObjectSize
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(
|
||||
|
|
|
@ -79,6 +79,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
|
|||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return len(data) <= 1<<20
|
||||
},
|
||||
SupportsStorageID: true,
|
||||
},
|
||||
{
|
||||
Storage: fstree.New(
|
||||
|
|
Loading…
Add table
Reference in a new issue