From 09750484f9898a895fbf16bceb490e7a7b86b1c6 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 17 Nov 2020 15:26:03 +0300 Subject: [PATCH] [#176] localstore: Draft storage engine structure and ops Implement the primary structure and operation of the local object storage engine. Signed-off-by: Leonard Lyubich --- go.mod | 1 + go.sum | 1 + pkg/local_object_storage/engine/delete.go | 52 ++++++++++++ pkg/local_object_storage/engine/engine.go | 53 +++++++++++++ pkg/local_object_storage/engine/get.go | 97 +++++++++++++++++++++++ pkg/local_object_storage/engine/head.go | 78 ++++++++++++++++++ pkg/local_object_storage/engine/put.go | 94 ++++++++++++++++++++++ pkg/local_object_storage/engine/select.go | 64 +++++++++++++++ pkg/local_object_storage/engine/shards.go | 71 +++++++++++++++++ 9 files changed, 511 insertions(+) create mode 100644 pkg/local_object_storage/engine/delete.go create mode 100644 pkg/local_object_storage/engine/engine.go create mode 100644 pkg/local_object_storage/engine/get.go create mode 100644 pkg/local_object_storage/engine/head.go create mode 100644 pkg/local_object_storage/engine/put.go create mode 100644 pkg/local_object_storage/engine/select.go create mode 100644 pkg/local_object_storage/engine/shards.go diff --git a/go.mod b/go.mod index 1f4eaa2de..701c683bf 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/multiformats/go-multiaddr v0.2.0 github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2 github.com/multiformats/go-multihash v0.0.13 // indirect + github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201201103311-576841e0e091 github.com/nspcc-dev/neofs-crypto v0.3.0 diff --git a/go.sum b/go.sum index 65bb7337e..38bd80187 100644 --- a/go.sum +++ b/go.sum @@ -583,6 +583,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go new file mode 100644 index 000000000..47207c3bd --- /dev/null +++ b/pkg/local_object_storage/engine/delete.go @@ -0,0 +1,52 @@ +package engine + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "go.uber.org/zap" +) + +// DeletePrm groups the parameters of Delete operation. +type DeletePrm struct { + addr *objectSDK.Address +} + +// DeleteRes groups resulting values of Delete operation. +type DeleteRes struct{} + +// WithAddress is a Delete option to set the address of the object to delete. +// +// Option is required. +func (p *DeletePrm) WithAddress(addr *objectSDK.Address) *DeletePrm { + if p != nil { + p.addr = addr + } + + return p +} + +// Delete marks object to delete from local storage. +// +// Returns any error encountered that did not allow to completely +// mark the object to delete. +func (e *StorageEngine) Delete(prm *DeletePrm) (*DeleteRes, error) { + e.mtx.RLock() + defer e.mtx.RUnlock() + + shPrm := new(shard.DeletePrm) + + e.iterateOverSortedShards(prm.addr, func(sh *shard.Shard) (stop bool) { + _, err := sh.Delete(shPrm) + if err != nil { + // TODO: smth wrong with shard, need to be processed + e.log.Warn("could not get object from shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error()), + ) + } + + return false + }) + + return nil, nil +} diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go new file mode 100644 index 000000000..d86010e4a --- /dev/null +++ b/pkg/local_object_storage/engine/engine.go @@ -0,0 +1,53 @@ +package engine + +import ( + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// StorageEngine represents NeoFS local storage engine. +type StorageEngine struct { + *cfg + + mtx *sync.RWMutex + + shards map[string]*shard.Shard +} + +// Option represents StorageEngine's constructor option. +type Option func(*cfg) + +type cfg struct { + log *logger.Logger +} + +func defaultCfg() *cfg { + return &cfg{ + log: zap.L(), + } +} + +// New creates, initializes and returns new StorageEngine instance. +func New(opts ...Option) *StorageEngine { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &StorageEngine{ + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]*shard.Shard), + } +} + +// WithLogger returns option to set StorageEngine's logger. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.log = l + } +} diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go new file mode 100644 index 000000000..a3b8501a3 --- /dev/null +++ b/pkg/local_object_storage/engine/get.go @@ -0,0 +1,97 @@ +package engine + +import ( + "errors" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "go.uber.org/zap" +) + +// GetPrm groups the parameters of Get operation. +type GetPrm struct { + off, ln uint64 + + addr *objectSDK.Address +} + +// GetRes groups resulting values of Get operation. +type GetRes struct { + obj *object.Object +} + +// ErrObjectNotFound is returns on read operations requested on a missing object. +var ErrObjectNotFound = errors.New("object not found") + +// WithAddress is a Get option to set the address of the requested object. +// +// Option is required. +func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm { + if p != nil { + p.addr = addr + } + + return p +} + +// WithPayloadRange is a Get option to set range of requested payload data. +// +// Missing an option or calling with zero arguments is equivalent +// to getting the full payload range. +func (p *GetPrm) WithPayloadRange(off, ln uint64) *GetPrm { + if p != nil { + p.off, p.ln = off, ln + } + + return p +} + +// Object returns the requested object part. +// +// Instance payload contains the requested range of the original object. +func (r *GetRes) Object() *object.Object { + return r.obj +} + +// Get reads part of an object from local storage. +// +// Returns any error encountered that +// did not allow to completely read the object part. +// +// Returns ErrObjectNotFound if requested object is missing in local storage. +func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) { + e.mtx.RLock() + defer e.mtx.RUnlock() + + var obj *object.Object + + shPrm := new(shard.GetPrm). + WithAddress(prm.addr). + WithFullRange() + + e.iterateOverSortedShards(prm.addr, func(sh *shard.Shard) (stop bool) { + res, err := sh.Get(shPrm) + if err != nil { + if !errors.Is(err, shard.ErrObjectNotFound) { + // TODO: smth wrong with shard, need to be processed + e.log.Warn("could not get object from shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error()), + ) + } + } else { + obj = res.Object() + } + + return err == nil + }) + + if obj == nil { + return nil, ErrObjectNotFound + } + + return &GetRes{ + obj: obj, + }, nil +} diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go new file mode 100644 index 000000000..e366cb50d --- /dev/null +++ b/pkg/local_object_storage/engine/head.go @@ -0,0 +1,78 @@ +package engine + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// HeadPrm groups the parameters of Head operation. +type HeadPrm struct { + addr *objectSDK.Address +} + +// HeadRes groups resulting values of Head operation. +type HeadRes struct { + head *object.Object +} + +// WithAddress is a Head option to set the address of the requested object. +// +// Option is required. +func (p *HeadPrm) WithAddress(addr *objectSDK.Address) *HeadPrm { + if p != nil { + p.addr = addr + } + + return p +} + +// Header returns the requested object header. +// +// Instance has empty payload. +func (r *HeadRes) Header() *object.Object { + return r.head +} + +// Head reads object header from local storage. +// +// Returns any error encountered that +// did not allow to completely read the object header. +// +// Returns ErrObjectNotFound if requested object is missing in local storage. +func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { + e.mtx.RLock() + defer e.mtx.RUnlock() + + var head *object.Object + + shPrm := new(shard.GetPrm). + WithAddress(prm.addr) + + e.iterateOverSortedShards(prm.addr, func(sh *shard.Shard) (stop bool) { + res, err := sh.Get(shPrm) + if err != nil { + if !errors.Is(err, shard.ErrObjectNotFound) { + // TODO: smth wrong with shard, need to be processed + e.log.Warn("could not get object from shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error()), + ) + } + } else { + head = res.Object() + } + + return err == nil + }) + + if head == nil { + return nil, ErrObjectNotFound + } + + return &HeadRes{ + head: head, + }, nil +} diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go new file mode 100644 index 000000000..1de882a59 --- /dev/null +++ b/pkg/local_object_storage/engine/put.go @@ -0,0 +1,94 @@ +package engine + +import ( + "errors" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "go.uber.org/zap" +) + +// PutPrm groups the parameters of Put operation. +type PutPrm struct { + obj *object.Object +} + +// PutRes groups resulting values of Put operation. +type PutRes struct{} + +var errPutShard = errors.New("could not put object to any shard") + +// WithObject is a Put option to set object to save. +// +// Option is required. +func (p *PutPrm) WithObject(obj *object.Object) *PutPrm { + if p != nil { + p.obj = obj + } + + return p +} + +// Put saves the object to local storage. +// +// Returns any error encountered that +// did not allow to completely save the object. +func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { + e.mtx.Lock() + defer e.mtx.Unlock() + + // choose shards through sorting by weight + sortedShards := e.sortShardsByWeight(prm.obj.Address()) + + // check object existence + if e.objectExists(prm.obj, sortedShards) { + return nil, nil + } + + shPrm := new(shard.PutPrm) + + // save the object into the "largest" possible shard + for _, sh := range sortedShards { + _, err := sh.Put( + shPrm.WithObject(prm.obj), + ) + + if err != nil { + // TODO: smth wrong with shard, need to be processed + e.log.Warn("could not save object in shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error()), + ) + } else { + return nil, nil + } + } + + return nil, errPutShard +} + +func (e *StorageEngine) objectExists(obj *object.Object, shards []*shard.Shard) bool { + exists := false + + for _, sh := range shards { + res, err := sh.Exists( + new(shard.ExistsPrm). + WithAddress(obj.Address()), + ) + + if err != nil { + // TODO: smth wrong with shard, need to be processed + e.log.Warn("could not check object existence", + zap.String("error", err.Error()), + ) + + continue + } + + if exists = res.Exists(); exists { + break + } + } + + return exists +} diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go new file mode 100644 index 000000000..289de84f9 --- /dev/null +++ b/pkg/local_object_storage/engine/select.go @@ -0,0 +1,64 @@ +package engine + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "go.uber.org/zap" +) + +// SelectPrm groups the parameters of Select operation. +type SelectPrm struct { + filters object.SearchFilters +} + +// SelectRes groups resulting values of Select operation. +type SelectRes struct { + addrList []*object.Address +} + +// WithFilters is a Select option to set the object filters. +func (p *SelectPrm) WithFilters(fs objectSDK.SearchFilters) *SelectPrm { + if p != nil { + p.filters = fs + } + + return p +} + +// AddressList returns list of addresses of the selected objects. +func (r *SelectRes) AddressList() []*objectSDK.Address { + return r.addrList +} + +// Select selects the objects from local storage that match select parameters. +// +// Returns any error encountered that did not allow to completely select the objects. +func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { + e.mtx.RLock() + defer e.mtx.RUnlock() + + addrList := make([]*object.Address, 0) + + shPrm := new(shard.SelectPrm). + WithFilters(prm.filters) + + e.iterateOverSortedShards(nil, func(sh *shard.Shard) (stop bool) { + res, err := sh.Select(shPrm) + if err != nil { + // TODO: smth wrong with shard, need to be processed + e.log.Warn("could not select objects from shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error()), + ) + } else { + addrList = append(addrList, res.AddressList()...) + } + + return false + }) + + return &SelectRes{ + addrList: addrList, + }, nil +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go new file mode 100644 index 000000000..3435b2c0c --- /dev/null +++ b/pkg/local_object_storage/engine/shards.go @@ -0,0 +1,71 @@ +package engine + +import ( + "fmt" + + "github.com/google/uuid" + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/pkg/errors" +) + +// AddShard adds a new shard to the storage engine. +// +// Returns any error encountered that did not allow adding a shard. +// Otherwise returns the ID of the added shard. +func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { + e.mtx.Lock() + defer e.mtx.Unlock() + + id, err := e.generateShardID() + if err != nil { + return nil, errors.Wrap(err, "could not generate shard ID") + } + + e.shards[id.String()] = shard.New(append(opts, shard.WithID(id))...) + + return id, nil +} + +func (e *StorageEngine) generateShardID() (*shard.ID, error) { + uid, err := uuid.NewRandom() + if err != nil { + return nil, err + } + + bin, err := uid.MarshalBinary() + if err != nil { + return nil, err + } + + return shard.NewIDFromBytes(bin), nil +} + +func (e *StorageEngine) shardWeight(sh *shard.Shard) float64 { + weightValues := sh.WeightValues() + + return float64(weightValues.FreeSpace) +} + +func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []*shard.Shard { + shards := make([]*shard.Shard, 0, len(e.shards)) + weights := make([]float64, 0, len(e.shards)) + + for _, sh := range e.shards { + shards = append(shards, sh) + weights = append(weights, e.shardWeight(sh)) + } + + hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.String()))) + + return shards +} + +func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(*shard.Shard) (stop bool)) { + for _, sh := range e.sortShardsByWeight(addr) { + if handler(sh) { + break + } + } +}