blobstor: Refactor storage selection #695
13 changed files with 59 additions and 76 deletions
|
@ -796,6 +796,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
})
|
})
|
||||||
case fstree.Type:
|
case fstree.Type:
|
||||||
fstreeOpts := []fstree.Option{
|
fstreeOpts := []fstree.Option{
|
||||||
|
|
|
@ -13,8 +13,9 @@ import (
|
||||||
|
|
||||||
// SubStorage represents single storage component with some storage policy.
|
// SubStorage represents single storage component with some storage policy.
|
||||||
type SubStorage struct {
|
type SubStorage struct {
|
||||||
Storage common.Storage
|
Storage common.Storage
|
||||||
Policy func(*objectSDK.Object, []byte) bool
|
Policy func(*objectSDK.Object, []byte) bool
|
||||||
|
SupportsStorageID bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlobStor represents FrostFS local BLOB storage.
|
// BlobStor represents FrostFS local BLOB storage.
|
||||||
|
|
|
@ -27,6 +27,7 @@ func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *testst
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return uint64(len(data)) <= smallSizeLimit
|
return uint64(len(data)) <= smallSizeLimit
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: largeFileStorage,
|
Storage: largeFileStorage,
|
||||||
|
@ -121,6 +122,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return uint64(len(data)) < smallSizeLimit
|
return uint64(len(data)) < smallSizeLimit
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(fstree.WithPath(dir)),
|
Storage: fstree.New(fstree.WithPath(dir)),
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -77,3 +78,25 @@ func (b *BlobStor) Close() error {
|
||||||
}
|
}
|
||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BlobStor) selectStorages(storageID []byte) []common.Storage {
|
||||||
|
var res []common.Storage
|
||||||
|
if storageID == nil {
|
||||||
|
for idx := range b.storage {
|
||||||
|
res = append(res, b.storage[idx].Storage)
|
||||||
|
}
|
||||||
|
} else if len(storageID) > 0 {
|
||||||
|
for idx := range b.storage {
|
||||||
|
if b.storage[idx].SupportsStorageID {
|
||||||
|
res = append(res, b.storage[idx].Storage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for idx := range b.storage {
|
||||||
|
if !b.storage[idx].SupportsStorageID {
|
||||||
|
res = append(res, b.storage[idx].Storage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
|
@ -6,8 +6,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
@ -31,32 +33,16 @@ func (b *BlobStor) Delete(ctx context.Context, prm common.DeletePrm) (common.Del
|
||||||
b.modeMtx.RLock()
|
b.modeMtx.RLock()
|
||||||
defer b.modeMtx.RUnlock()
|
defer b.modeMtx.RUnlock()
|
||||||
|
|
||||||
if prm.StorageID == nil {
|
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||||
for i := range b.storage {
|
res, err := storage.Delete(ctx, prm)
|
||||||
res, err := b.storage[i].Storage.Delete(ctx, prm)
|
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
if err == nil {
|
||||||
if err == nil {
|
success = true
|
||||||
success = true
|
logOp(b.log, deleteOp, prm.Address, storage.Type(), prm.StorageID)
|
||||||
logOp(b.log, deleteOp, prm.Address, b.storage[i].Storage.Type(), prm.StorageID)
|
|
||||||
}
|
|
||||||
return res, err
|
|
||||||
}
|
}
|
||||||
|
return res, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var st common.Storage
|
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
|
||||||
if len(prm.StorageID) == 0 {
|
|
||||||
st = b.storage[len(b.storage)-1].Storage
|
|
||||||
} else {
|
|
||||||
st = b.storage[0].Storage
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := st.Delete(ctx, prm)
|
|
||||||
if err == nil {
|
|
||||||
success = true
|
|
||||||
logOp(b.log, deleteOp, prm.Address, st.Type(), prm.StorageID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,29 +36,9 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
||||||
b.modeMtx.RLock()
|
b.modeMtx.RLock()
|
||||||
defer b.modeMtx.RUnlock()
|
defer b.modeMtx.RUnlock()
|
||||||
|
|
||||||
if prm.StorageID != nil {
|
|
||||||
if len(prm.StorageID) == 0 {
|
|
||||||
res, err := b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
|
|
||||||
exists = err == nil && res.Exists
|
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
res, err := b.storage[0].Storage.Exists(ctx, prm)
|
|
||||||
exists = err == nil && res.Exists
|
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there was an error during existence check below,
|
|
||||||
// it will be returned unless object was found in blobovnicza.
|
|
||||||
// Otherwise, it is logged and the latest error is returned.
|
|
||||||
// FSTree | Blobovnicza | Behaviour
|
|
||||||
// found | (not tried) | return true, nil
|
|
||||||
// not found | any result | return the result
|
|
||||||
// error | found | log the error, return true, nil
|
|
||||||
// error | not found | return the error
|
|
||||||
// error | error | log the first error, return the second
|
|
||||||
var errors []error
|
var errors []error
|
||||||
for i := range b.storage {
|
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||||
res, err := b.storage[i].Storage.Exists(ctx, prm)
|
res, err := storage.Exists(ctx, prm)
|
||||||
if err == nil && res.Exists {
|
if err == nil && res.Exists {
|
||||||
exists = true
|
exists = true
|
||||||
return res, nil
|
return res, nil
|
||||||
|
|
|
@ -36,20 +36,12 @@ func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (res common.GetRe
|
||||||
b.modeMtx.RLock()
|
b.modeMtx.RLock()
|
||||||
defer b.modeMtx.RUnlock()
|
defer b.modeMtx.RUnlock()
|
||||||
|
|
||||||
if prm.StorageID == nil {
|
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||||
for i := range b.storage {
|
res, err = storage.Get(ctx, prm)
|
||||||
res, err = b.storage[i].Storage.Get(ctx, prm)
|
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
return res, err
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
|
||||||
if len(prm.StorageID) == 0 {
|
|
||||||
res, err = b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
|
|
||||||
} else {
|
|
||||||
res, err = b.storage[0].Storage.Get(ctx, prm)
|
|
||||||
}
|
|
||||||
return res, err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,20 +38,12 @@ func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (res co
|
||||||
b.modeMtx.RLock()
|
b.modeMtx.RLock()
|
||||||
defer b.modeMtx.RUnlock()
|
defer b.modeMtx.RUnlock()
|
||||||
|
|
||||||
if prm.StorageID == nil {
|
for _, storage := range b.selectStorages(prm.StorageID) {
|
||||||
for i := range b.storage {
|
res, err = storage.GetRange(ctx, prm)
|
||||||
res, err = b.storage[i].Storage.GetRange(ctx, prm)
|
if err == nil || !client.IsErrObjectNotFound(err) {
|
||||||
if err == nil || !client.IsErrObjectNotFound(err) {
|
return res, err
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
|
||||||
if len(prm.StorageID) == 0 {
|
|
||||||
res, err = b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
|
|
||||||
} else {
|
|
||||||
res, err = b.storage[0].Storage.GetRange(ctx, prm)
|
|
||||||
}
|
|
||||||
return res, err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,6 +148,7 @@ func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return uint64(len(data)) < smallSize
|
return uint64(len(data)) < smallSize
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
|
@ -176,6 +177,7 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return uint64(len(data)) < smallSize
|
return uint64(len(data)) < smallSize
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: largeFileStorage,
|
Storage: largeFileStorage,
|
||||||
|
|
|
@ -43,6 +43,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return len(data) <= 1<<20
|
return len(data) <= 1<<20
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
|
|
|
@ -41,6 +41,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return len(data) <= 1<<20
|
return len(data) <= 1<<20
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
|
|
|
@ -89,6 +89,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return len(data) <= smallObjectSize
|
return len(data) <= smallObjectSize
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
|
|
|
@ -79,6 +79,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
return len(data) <= 1<<20
|
return len(data) <= 1<<20
|
||||||
},
|
},
|
||||||
|
SupportsStorageID: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
|
|
Loading…
Add table
Reference in a new issue