From 46e455dcae76651a557310164ea4ec244b650f77 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 17 Nov 2020 20:39:43 +0300 Subject: [PATCH] [#176] localstore: Implement shard methods via Metabase and BlobStor Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/exists.go | 21 ++++++++++ pkg/local_object_storage/shard/delete.go | 17 +++++++- pkg/local_object_storage/shard/get.go | 45 ++++++++++++++++++++- pkg/local_object_storage/shard/put.go | 28 ++++++++++++- pkg/local_object_storage/shard/select.go | 10 ++++- pkg/local_object_storage/shard/shard.go | 41 ++++++++++++++++++- 6 files changed, 154 insertions(+), 8 deletions(-) create mode 100644 pkg/local_object_storage/metabase/exists.go diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go new file mode 100644 index 000000000..3eeab905c --- /dev/null +++ b/pkg/local_object_storage/metabase/exists.go @@ -0,0 +1,21 @@ +package meta + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/pkg/errors" +) + +// Exists checks if object is presented in metabase. +func (db *DB) Exists(addr *object.Address) (bool, error) { + // FIXME: temp solution, avoid direct Get usage + _, err := db.Get(addr) + if err != nil { + if errors.Is(err, errNotFound) { + return false, nil + } + + return false, err + } + + return true, nil +} diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index c1a81af69..fd1df58ba 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -2,6 +2,9 @@ package shard import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/pkg/errors" + "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. @@ -31,7 +34,19 @@ func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) { s.mtx.Lock() defer s.mtx.Unlock() - // FIXME: implement me + // mark object to delete in metabase + if err := s.metaBase.Delete(prm.addr); err != nil { + s.log.Warn("could not mark object to delete in metabase", + zap.String("error", err.Error()), + ) + } + + if _, err := s.blobStor.Delete( + new(blobstor.DeletePrm). + WithAddress(prm.addr), + ); err != nil { + return nil, errors.Wrap(err, "could not remove object from BLOB storage") + } return nil, nil } diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index feacf1cfd..82f08dd99 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -5,6 +5,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" ) // GetPrm groups the parameters of Get operation. @@ -73,7 +74,47 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) { s.mtx.RLock() defer s.mtx.RUnlock() - // FIXME: implement me + if prm.ln < 0 { + // try to read from WriteCache + // TODO: implement - return nil, nil + res, err := s.blobStor.Get( + new(blobstor.GetPrm). + WithAddress(prm.addr), + ) + if err != nil { + if errors.Is(err, blobstor.ErrObjectNotFound) { + err = ErrObjectNotFound + } + + return nil, err + } + + return &GetRes{ + obj: res.Object(), + }, nil + } + + // try to read from WriteCache + // TODO: implement + + res, err := s.blobStor.GetRange( + new(blobstor.GetRangePrm). + WithAddress(prm.addr). + WithPayloadRange(prm.off, uint64(prm.ln)), + ) + if err != nil { + if errors.Is(err, blobstor.ErrObjectNotFound) { + err = ErrObjectNotFound + } + + return nil, err + } + + obj := object.NewRaw() + obj.SetPayload(res.RangeData()) + + return &GetRes{ + obj: obj.Object(), + }, nil } diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 0c726c865..83daa45f4 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -2,6 +2,8 @@ package shard import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/pkg/errors" ) // PutPrm groups the parameters of Put operation. @@ -29,7 +31,31 @@ func (s *Shard) Put(prm *PutPrm) (*PutRes, error) { s.mtx.Lock() defer s.mtx.Unlock() - // FIXME: implement me + // check object existence + ex, err := s.metaBase.Exists(prm.obj.Address()) + if err != nil { + return nil, errors.Wrap(err, "could not check object existence") + } else if ex { + return nil, nil + } + + // try to put to WriteCache + // TODO: implement + + // put to BlobStor + if _, err := s.blobStor.Put( + new(blobstor.PutPrm). + WithObject(prm.obj), + ); err != nil { + return nil, errors.Wrap(err, "could not put object to BLOB storage") + } + + // put to metabase + if err := s.metaBase.Put(prm.obj); err != nil { + // may we need to handle this case in a special way + // since the object has been successfully written to BlobStor + return nil, errors.Wrap(err, "could not put object to metabase") + } return nil, nil } diff --git a/pkg/local_object_storage/shard/select.go b/pkg/local_object_storage/shard/select.go index be17180a7..676681613 100644 --- a/pkg/local_object_storage/shard/select.go +++ b/pkg/local_object_storage/shard/select.go @@ -2,6 +2,7 @@ package shard import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/pkg/errors" ) // SelectPrm groups the parameters of Select operation. @@ -36,7 +37,12 @@ func (s *Shard) Select(prm *SelectPrm) (*SelectRes, error) { s.mtx.RLock() defer s.mtx.RUnlock() - // FIXME: implement me + addrList, err := s.metaBase.Select(prm.filters) + if err != nil { + return nil, errors.Wrap(err, "could not select objects from metabase") + } - return nil, nil + return &SelectRes{ + addrList: addrList, + }, nil } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 94eb7f08a..75479383d 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -2,6 +2,10 @@ package shard import ( "sync" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" ) // Shard represents single shard of NeoFS Local Storage Engine. @@ -11,6 +15,10 @@ type Shard struct { mtx *sync.RWMutex weight WeightValues + + blobStor *blobstor.BlobStor + + metaBase *meta.DB } // Option represents Shard's constructor option. @@ -18,6 +26,12 @@ type Option func(*cfg) type cfg struct { id *ID + + blobOpts []blobstor.Option + + metaOpts []meta.Option + + log *logger.Logger } func defaultCfg() *cfg { @@ -33,8 +47,10 @@ func New(opts ...Option) *Shard { } return &Shard{ - cfg: c, - mtx: new(sync.RWMutex), + cfg: c, + mtx: new(sync.RWMutex), + blobStor: blobstor.New(c.blobOpts...), + metaBase: meta.NewDB(c.metaOpts...), } } @@ -44,3 +60,24 @@ func WithID(id *ID) Option { c.id = id } } + +// WithBlobStorOptions returns option to set internal BlobStor options. +func WithBlobStorOptions(opts []blobstor.Option) Option { + return func(c *cfg) { + c.blobOpts = opts + } +} + +// WithMetaBaseOptions returns option to set internal metabase options. +func WithMetaBaseOptions(opts []meta.Option) Option { + return func(c *cfg) { + c.metaOpts = opts + } +} + +// WithLogger returns option to set Shard's logger. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.log = l + } +}