forked from TrueCloudLab/frostfs-node
[#176] localstore: Implement shard methods via Metabase and BlobStor
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
b127607ac6
commit
46e455dcae
6 changed files with 154 additions and 8 deletions
21
pkg/local_object_storage/metabase/exists.go
Normal file
21
pkg/local_object_storage/metabase/exists.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Exists checks if object is presented in metabase.
|
||||
func (db *DB) Exists(addr *object.Address) (bool, error) {
|
||||
// FIXME: temp solution, avoid direct Get usage
|
||||
_, err := db.Get(addr)
|
||||
if err != nil {
|
||||
if errors.Is(err, errNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
|
@ -2,6 +2,9 @@ package shard
|
|||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DeletePrm groups the parameters of Delete operation.
|
||||
|
@ -31,7 +34,19 @@ func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
// FIXME: implement me
|
||||
// mark object to delete in metabase
|
||||
if err := s.metaBase.Delete(prm.addr); err != nil {
|
||||
s.log.Warn("could not mark object to delete in metabase",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
if _, err := s.blobStor.Delete(
|
||||
new(blobstor.DeletePrm).
|
||||
WithAddress(prm.addr),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "could not remove object from BLOB storage")
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
)
|
||||
|
||||
// GetPrm groups the parameters of Get operation.
|
||||
|
@ -73,7 +74,47 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
|
|||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
// FIXME: implement me
|
||||
if prm.ln < 0 {
|
||||
// try to read from WriteCache
|
||||
// TODO: implement
|
||||
|
||||
return nil, nil
|
||||
res, err := s.blobStor.Get(
|
||||
new(blobstor.GetPrm).
|
||||
WithAddress(prm.addr),
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, blobstor.ErrObjectNotFound) {
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &GetRes{
|
||||
obj: res.Object(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// try to read from WriteCache
|
||||
// TODO: implement
|
||||
|
||||
res, err := s.blobStor.GetRange(
|
||||
new(blobstor.GetRangePrm).
|
||||
WithAddress(prm.addr).
|
||||
WithPayloadRange(prm.off, uint64(prm.ln)),
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, blobstor.ErrObjectNotFound) {
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj := object.NewRaw()
|
||||
obj.SetPayload(res.RangeData())
|
||||
|
||||
return &GetRes{
|
||||
obj: obj.Object(),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package shard
|
|||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// PutPrm groups the parameters of Put operation.
|
||||
|
@ -29,7 +31,31 @@ func (s *Shard) Put(prm *PutPrm) (*PutRes, error) {
|
|||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
// FIXME: implement me
|
||||
// check object existence
|
||||
ex, err := s.metaBase.Exists(prm.obj.Address())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not check object existence")
|
||||
} else if ex {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// try to put to WriteCache
|
||||
// TODO: implement
|
||||
|
||||
// put to BlobStor
|
||||
if _, err := s.blobStor.Put(
|
||||
new(blobstor.PutPrm).
|
||||
WithObject(prm.obj),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "could not put object to BLOB storage")
|
||||
}
|
||||
|
||||
// put to metabase
|
||||
if err := s.metaBase.Put(prm.obj); err != nil {
|
||||
// may we need to handle this case in a special way
|
||||
// since the object has been successfully written to BlobStor
|
||||
return nil, errors.Wrap(err, "could not put object to metabase")
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// SelectPrm groups the parameters of Select operation.
|
||||
|
@ -36,7 +37,12 @@ func (s *Shard) Select(prm *SelectPrm) (*SelectRes, error) {
|
|||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
// FIXME: implement me
|
||||
|
||||
return nil, nil
|
||||
addrList, err := s.metaBase.Select(prm.filters)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not select objects from metabase")
|
||||
}
|
||||
|
||||
return &SelectRes{
|
||||
addrList: addrList,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,10 @@ package shard
|
|||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
)
|
||||
|
||||
// Shard represents single shard of NeoFS Local Storage Engine.
|
||||
|
@ -11,6 +15,10 @@ type Shard struct {
|
|||
mtx *sync.RWMutex
|
||||
|
||||
weight WeightValues
|
||||
|
||||
blobStor *blobstor.BlobStor
|
||||
|
||||
metaBase *meta.DB
|
||||
}
|
||||
|
||||
// Option represents Shard's constructor option.
|
||||
|
@ -18,6 +26,12 @@ type Option func(*cfg)
|
|||
|
||||
type cfg struct {
|
||||
id *ID
|
||||
|
||||
blobOpts []blobstor.Option
|
||||
|
||||
metaOpts []meta.Option
|
||||
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -35,6 +49,8 @@ func New(opts ...Option) *Shard {
|
|||
return &Shard{
|
||||
cfg: c,
|
||||
mtx: new(sync.RWMutex),
|
||||
blobStor: blobstor.New(c.blobOpts...),
|
||||
metaBase: meta.NewDB(c.metaOpts...),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,3 +60,24 @@ func WithID(id *ID) Option {
|
|||
c.id = id
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobStorOptions returns option to set internal BlobStor options.
|
||||
func WithBlobStorOptions(opts []blobstor.Option) Option {
|
||||
return func(c *cfg) {
|
||||
c.blobOpts = opts
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetaBaseOptions returns option to set internal metabase options.
|
||||
func WithMetaBaseOptions(opts []meta.Option) Option {
|
||||
return func(c *cfg) {
|
||||
c.metaOpts = opts
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to set Shard's logger.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue