forked from TrueCloudLab/frostfs-node
[#1523] local_object_storage: Unify parameters for the Get
operation
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
b621f5983a
commit
d9a2f280c9
11 changed files with 73 additions and 103 deletions
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/hashicorp/golang-lru/simplelru"
|
||||
"github.com/nspcc-dev/hrw"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -206,16 +207,16 @@ func (b *Blobovniczas) Put(addr oid.Address, data []byte) (*blobovnicza.ID, erro
|
|||
return id, nil
|
||||
}
|
||||
|
||||
// reads object from blobovnicza tree.
|
||||
// Get reads object from blobovnicza tree.
|
||||
//
|
||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||
// Otherwise, all Blobovniczas are processed descending weight.
|
||||
func (b *Blobovniczas) Get(prm GetSmallPrm) (res GetSmallRes, err error) {
|
||||
func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
|
||||
var bPrm blobovnicza.GetPrm
|
||||
bPrm.SetAddress(prm.addr)
|
||||
bPrm.SetAddress(prm.Address)
|
||||
|
||||
if prm.blobovniczaID != nil {
|
||||
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
||||
if prm.BlobovniczaID != nil {
|
||||
blz, err := b.openBlobovnicza(prm.BlobovniczaID.String())
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
@ -225,7 +226,7 @@ func (b *Blobovniczas) Get(prm GetSmallPrm) (res GetSmallRes, err error) {
|
|||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
||||
err = b.iterateSortedLeaves(&prm.addr, func(p string) (bool, error) {
|
||||
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
||||
_, ok := activeCache[dirPath]
|
||||
|
@ -246,7 +247,7 @@ func (b *Blobovniczas) Get(prm GetSmallPrm) (res GetSmallRes, err error) {
|
|||
return err == nil, nil
|
||||
})
|
||||
|
||||
if err == nil && res.Object() == nil {
|
||||
if err == nil && res.Object == nil {
|
||||
// not found in any blobovnicza
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
|
@ -430,7 +431,7 @@ func (b *Blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath
|
|||
// tries to read object from particular blobovnicza.
|
||||
//
|
||||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (GetSmallRes, error) {
|
||||
func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
|
||||
lvlPath := filepath.Dir(blzPath)
|
||||
|
||||
// try to read from blobovnicza if it is opened
|
||||
|
@ -477,13 +478,13 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
|
|||
b.log.Debug("index is too big", zap.String("path", blzPath))
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
return GetSmallRes{}, errNotFound
|
||||
return common.GetRes{}, errNotFound
|
||||
}
|
||||
|
||||
// open blobovnicza (cached inside)
|
||||
blz, err := b.openBlobovnicza(blzPath)
|
||||
if err != nil {
|
||||
return GetSmallRes{}, err
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
return b.getObject(blz, prm)
|
||||
|
@ -579,27 +580,25 @@ func (b *Blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicz
|
|||
}
|
||||
|
||||
// reads object from blobovnicza and returns GetSmallRes.
|
||||
func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (GetSmallRes, error) {
|
||||
func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) {
|
||||
res, err := blz.Get(prm)
|
||||
if err != nil {
|
||||
return GetSmallRes{}, err
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
// decompress the data
|
||||
data, err := b.Decompress(res.Object())
|
||||
if err != nil {
|
||||
return GetSmallRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
// unmarshal the object
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return GetSmallRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
return GetSmallRes{
|
||||
obj: obj,
|
||||
}, nil
|
||||
return common.GetRes{Object: obj}, nil
|
||||
}
|
||||
|
||||
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
|
@ -76,20 +77,20 @@ func TestBlobovniczas(t *testing.T) {
|
|||
require.NoError(t, err, i)
|
||||
|
||||
// get w/ blobovnicza ID
|
||||
var prm GetSmallPrm
|
||||
prm.SetBlobovniczaID(id)
|
||||
prm.SetAddress(addr)
|
||||
var prm common.GetPrm
|
||||
prm.BlobovniczaID = id
|
||||
prm.Address = addr
|
||||
|
||||
res, err := b.Get(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, res.Object())
|
||||
require.Equal(t, obj, res.Object)
|
||||
|
||||
// get w/o blobovnicza ID
|
||||
prm.SetBlobovniczaID(nil)
|
||||
prm.BlobovniczaID = nil
|
||||
|
||||
res, err = b.Get(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, res.Object())
|
||||
require.Equal(t, obj, res.Object)
|
||||
|
||||
// get range w/ blobovnicza ID
|
||||
var rngPrm GetRangeSmallPrm
|
||||
|
@ -119,7 +120,7 @@ func TestBlobovniczas(t *testing.T) {
|
|||
}
|
||||
|
||||
var dPrm DeleteSmallPrm
|
||||
var gPrm GetSmallPrm
|
||||
var gPrm common.GetPrm
|
||||
|
||||
for i := range addrList {
|
||||
dPrm.SetAddress(addrList[i])
|
||||
|
@ -127,7 +128,7 @@ func TestBlobovniczas(t *testing.T) {
|
|||
_, err := b.Delete(dPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
gPrm.SetAddress(addrList[i])
|
||||
gPrm.Address = addrList[i]
|
||||
|
||||
_, err = b.Get(gPrm)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// GetSmallPrm groups the parameters of GetSmallPrm operation.
|
||||
type GetSmallPrm struct {
|
||||
addr oid.Address
|
||||
blobovniczaID *blobovnicza.ID
|
||||
}
|
||||
|
||||
// SetAddress sets object address.
|
||||
func (p *GetSmallPrm) SetAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
// SetBlobovniczaID sets blobovnicza id.
|
||||
func (p *GetSmallPrm) SetBlobovniczaID(id *blobovnicza.ID) {
|
||||
p.blobovniczaID = id
|
||||
}
|
||||
|
||||
// GetSmallRes groups the resulting values of GetSmall operation.
|
||||
type GetSmallRes struct {
|
||||
obj *object.Object
|
||||
}
|
||||
|
||||
// Object returns read object.
|
||||
func (r GetSmallRes) Object() *object.Object {
|
||||
return r.obj
|
||||
}
|
|
@ -5,7 +5,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -38,16 +38,13 @@ func TestCompression(t *testing.T) {
|
|||
}
|
||||
|
||||
testGet := func(t *testing.T, b *BlobStor, i int) {
|
||||
var prm blobovniczatree.GetSmallPrm
|
||||
prm.SetAddress(object.AddressOf(smallObj[i]))
|
||||
|
||||
res1, err := b.GetSmall(prm)
|
||||
res1, err := b.GetSmall(common.GetPrm{Address: object.AddressOf(smallObj[i])})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, smallObj[i], res1.Object())
|
||||
require.Equal(t, smallObj[i], res1.Object)
|
||||
|
||||
res2, err := b.GetBig(GetBigPrm{address: address{object.AddressOf(bigObj[i])}})
|
||||
res2, err := b.GetBig(common.GetPrm{Address: object.AddressOf(bigObj[i])})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bigObj[i], res2.Object())
|
||||
require.Equal(t, bigObj[i], res2.Object)
|
||||
}
|
||||
|
||||
testPut := func(t *testing.T, b *BlobStor, i int) {
|
||||
|
|
16
pkg/local_object_storage/blobstor/common/get.go
Normal file
16
pkg/local_object_storage/blobstor/common/get.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type GetPrm struct {
|
||||
Address oid.Address
|
||||
BlobovniczaID *blobovnicza.ID
|
||||
}
|
||||
|
||||
type GetRes struct {
|
||||
Object *objectSDK.Object
|
||||
}
|
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
|
@ -233,8 +234,8 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error
|
|||
}
|
||||
|
||||
// Get returns an object from the storage by address.
|
||||
func (t *FSTree) Get(addr oid.Address) ([]byte, error) {
|
||||
p := t.treePath(addr)
|
||||
func (t *FSTree) Get(prm common.GetPrm) ([]byte, error) {
|
||||
p := t.treePath(prm.Address)
|
||||
|
||||
if _, err := os.Stat(p); os.IsNotExist(err) {
|
||||
return nil, ErrFileNotFound
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||
|
@ -52,12 +53,12 @@ func TestFSTree(t *testing.T) {
|
|||
|
||||
t.Run("get", func(t *testing.T) {
|
||||
for _, a := range addrs {
|
||||
actual, err := fs.Get(a)
|
||||
actual, err := fs.Get(common.GetPrm{Address: a})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, store[a.EncodeToString()], actual)
|
||||
}
|
||||
|
||||
_, err := fs.Get(oidtest.Address())
|
||||
_, err := fs.Get(common.GetPrm{Address: oidtest.Address()})
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
|
|
|
@ -4,22 +4,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
)
|
||||
|
||||
// GetBigPrm groups the parameters of GetBig operation.
|
||||
type GetBigPrm struct {
|
||||
address
|
||||
}
|
||||
|
||||
// GetBigRes groups the resulting values of GetBig operation.
|
||||
type GetBigRes struct {
|
||||
roObject
|
||||
}
|
||||
|
||||
// GetBig reads the object from shallow dir of BLOB storage by address.
|
||||
//
|
||||
// Returns any error encountered that
|
||||
|
@ -27,37 +17,33 @@ type GetBigRes struct {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
||||
// presented in shallow dir.
|
||||
func (b *BlobStor) GetBig(prm GetBigPrm) (GetBigRes, error) {
|
||||
func (b *BlobStor) GetBig(prm common.GetPrm) (common.GetRes, error) {
|
||||
// get compressed object data
|
||||
data, err := b.fsTree.Get(prm.addr)
|
||||
data, err := b.fsTree.Get(prm)
|
||||
if err != nil {
|
||||
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
return GetBigRes{}, errNotFound
|
||||
return common.GetRes{}, errNotFound
|
||||
}
|
||||
|
||||
return GetBigRes{}, fmt.Errorf("could not read object from fs tree: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("could not read object from fs tree: %w", err)
|
||||
}
|
||||
|
||||
data, err = b.Decompress(data)
|
||||
if err != nil {
|
||||
return GetBigRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
// unmarshal the object
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return GetBigRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
return GetBigRes{
|
||||
roObject: roObject{
|
||||
obj: obj,
|
||||
},
|
||||
}, nil
|
||||
return common.GetRes{Object: obj}, nil
|
||||
}
|
||||
|
||||
func (b *BlobStor) GetSmall(prm blobovniczatree.GetSmallPrm) (blobovniczatree.GetSmallRes, error) {
|
||||
func (b *BlobStor) GetSmall(prm common.GetPrm) (common.GetRes, error) {
|
||||
return b.blobovniczas.Get(prm)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -30,7 +31,7 @@ type GetRangeBigRes struct {
|
|||
// Returns an error of type apistatus.ObjectNotFound if object is missing.
|
||||
func (b *BlobStor) GetRangeBig(prm GetRangeBigPrm) (GetRangeBigRes, error) {
|
||||
// get compressed object data
|
||||
data, err := b.fsTree.Get(prm.addr)
|
||||
data, err := b.fsTree.Get(common.GetPrm{Address: prm.addr})
|
||||
if err != nil {
|
||||
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
|
@ -65,28 +65,28 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
|||
var big, small storFetcher
|
||||
|
||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||
var getBigPrm blobstor.GetBigPrm
|
||||
getBigPrm.SetAddress(prm.addr)
|
||||
var getBigPrm common.GetPrm
|
||||
getBigPrm.Address = prm.addr
|
||||
|
||||
res, err := stor.GetBig(getBigPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||
var getSmallPrm blobovniczatree.GetSmallPrm
|
||||
getSmallPrm.SetAddress(prm.addr)
|
||||
getSmallPrm.SetBlobovniczaID(id)
|
||||
var getSmallPrm common.GetPrm
|
||||
getSmallPrm.Address = prm.addr
|
||||
getSmallPrm.BlobovniczaID = id
|
||||
|
||||
res, err := stor.GetSmall(getSmallPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
|
@ -34,7 +35,7 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
|
|||
return obj, obj.Unmarshal(value)
|
||||
}
|
||||
|
||||
data, err := c.fsTree.Get(addr)
|
||||
data, err := c.fsTree.Get(common.GetPrm{Address: addr})
|
||||
if err != nil {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
|
|
Loading…
Reference in a new issue