From b127607ac68ad6f430faf084be87109af89c9043 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 17 Nov 2020 19:29:00 +0300 Subject: [PATCH] [#176] localstore: Implement primary BlobStor Implement primary local BLOB storage based on filesystem tree. Signed-off-by: Leonard Lyubich --- go.mod | 1 + go.sum | 2 + pkg/local_object_storage/blobstor/blobstor.go | 100 ++++++++++++++++++ pkg/local_object_storage/blobstor/compress.go | 35 ++++++ pkg/local_object_storage/blobstor/delete.go | 54 ++++++++++ pkg/local_object_storage/blobstor/fstree.go | 49 +++++++++ pkg/local_object_storage/blobstor/get.go | 83 +++++++++++++++ pkg/local_object_storage/blobstor/getrange.go | 82 ++++++++++++++ pkg/local_object_storage/blobstor/put.go | 61 +++++++++++ 9 files changed, 467 insertions(+) create mode 100644 pkg/local_object_storage/blobstor/blobstor.go create mode 100644 pkg/local_object_storage/blobstor/compress.go create mode 100644 pkg/local_object_storage/blobstor/delete.go create mode 100644 pkg/local_object_storage/blobstor/fstree.go create mode 100644 pkg/local_object_storage/blobstor/get.go create mode 100644 pkg/local_object_storage/blobstor/getrange.go create mode 100644 pkg/local_object_storage/blobstor/put.go diff --git a/go.mod b/go.mod index 701c683bf..bdff4ed39 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.1 github.com/hashicorp/golang-lru v0.5.4 + github.com/klauspost/compress v1.11.3 github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.1.3 github.com/multiformats/go-multiaddr v0.2.0 diff --git a/go.sum b/go.sum index 38bd80187..6ffdced89 100644 --- a/go.sum +++ b/go.sum @@ -207,6 +207,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc= +github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/pkg/local_object_storage/blobstor/blobstor.go b/pkg/local_object_storage/blobstor/blobstor.go new file mode 100644 index 000000000..e0d4dc1e3 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobstor.go @@ -0,0 +1,100 @@ +package blobstor + +import ( + "encoding/hex" + "os" + "sync" +) + +// BlobStor represents NeoFS local BLOB storage. +type BlobStor struct { + *cfg + + mtx *sync.RWMutex +} + +// Option represents BlobStor's constructor option. +type Option func(*cfg) + +type cfg struct { + fsTree fsTree + + compressor func([]byte) []byte + + decompressor func([]byte) ([]byte, error) +} + +const ( + defaultShallowDepth = 4 + defaultPerm = 0700 +) + +func defaultCfg() *cfg { + return &cfg{ + fsTree: fsTree{ + depth: defaultShallowDepth, + dirNameLen: hex.EncodedLen(dirNameLen), + perm: defaultPerm, + rootDir: "./", + }, + compressor: noOpCompressor, + decompressor: noOpDecompressor, + } +} + +// New creates, initializes and returns new BlobStor instance. +func New(opts ...Option) *BlobStor { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &BlobStor{ + cfg: c, + } +} + +// WithShallowDepth returns option to set the +// depth of the object file subdirectory tree. +// +// Depth is reduced to maximum value in case of overflow. +func WithShallowDepth(depth int) Option { + return func(c *cfg) { + if depth <= maxDepth { + depth = maxDepth + } + + c.fsTree.depth = depth + } +} + +// WithCompressObjects returns option to toggle +// compression of the stored objects. +func WithCompressObjects(comp bool) Option { + return func(c *cfg) { + if comp { + c.compressor = zstdCompressor() + c.decompressor = zstdDecompressor() + } else { + c.compressor = noOpCompressor + c.decompressor = noOpDecompressor + } + } +} + +// WithTreeRootPath returns option to set path to root directory +// of the fs tree to write the objects. +func WithTreeRootPath(rootDir string) Option { + return func(c *cfg) { + c.fsTree.rootDir = rootDir + } +} + +// WithTreeRootPerm returns option to set permission +// bits of the fs tree. +func WithTreeRootPerm(perm os.FileMode) Option { + return func(c *cfg) { + c.fsTree.perm = perm + } +} diff --git a/pkg/local_object_storage/blobstor/compress.go b/pkg/local_object_storage/blobstor/compress.go new file mode 100644 index 000000000..01eee1006 --- /dev/null +++ b/pkg/local_object_storage/blobstor/compress.go @@ -0,0 +1,35 @@ +package blobstor + +import ( + "github.com/klauspost/compress/zstd" +) + +func noOpCompressor(data []byte) []byte { + return data +} + +func noOpDecompressor(data []byte) ([]byte, error) { + return data, nil +} + +func zstdCompressor() func([]byte) []byte { + enc, err := zstd.NewWriter(nil) + if err != nil { + panic(err) + } + + return func(data []byte) []byte { + return enc.EncodeAll(data, make([]byte, 0, len(data))) + } +} + +func zstdDecompressor() func([]byte) ([]byte, error) { + dec, err := zstd.NewReader(nil) + if err != nil { + panic(err) + } + + return func(data []byte) ([]byte, error) { + return dec.DecodeAll(data, nil) + } +} diff --git a/pkg/local_object_storage/blobstor/delete.go b/pkg/local_object_storage/blobstor/delete.go new file mode 100644 index 000000000..e219d62dd --- /dev/null +++ b/pkg/local_object_storage/blobstor/delete.go @@ -0,0 +1,54 @@ +package blobstor + +import ( + "os" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/pkg/errors" +) + +// DeletePrm groups the parameters of Delete operation. +type DeletePrm struct { + addr *objectSDK.Address +} + +// DeleteRes groups resulting values of Delete operation. +type DeleteRes struct{} + +// WithAddress is a Delete option to set the address of the object to delete. +// +// Option is required. +func (p *DeletePrm) WithAddress(addr *objectSDK.Address) *DeletePrm { + if p != nil { + p.addr = addr + } + + return p +} + +// Delete removes object from BLOB storage. +// +// Returns any error encountered that did not allow +// to completely remove the object. +func (b *BlobStor) Delete(prm *DeletePrm) (*DeleteRes, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + + err := b.fsTree.delete(prm.addr) + if errors.Is(err, errFileNotFound) { + err = nil + } + + return nil, err +} + +func (t *fsTree) delete(addr *object.Address) error { + p := t.treePath(addr) + + if _, err := os.Stat(p); os.IsNotExist(err) { + return errFileNotFound + } + + return os.Remove(p) +} diff --git a/pkg/local_object_storage/blobstor/fstree.go b/pkg/local_object_storage/blobstor/fstree.go new file mode 100644 index 000000000..29dd6048c --- /dev/null +++ b/pkg/local_object_storage/blobstor/fstree.go @@ -0,0 +1,49 @@ +package blobstor + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "os" + "path" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type fsTree struct { + depth int + + dirNameLen int + + perm os.FileMode + + rootDir string +} + +const dirNameLen = 2 // in bytes + +var maxDepth = (hex.EncodedLen(sha256.Size) - 1) / dirNameLen + +var errFileNotFound = errors.New("file not found") + +func stringifyAddress(addr *objectSDK.Address) string { + h := sha256.Sum256([]byte(addr.String())) + + return hex.EncodeToString(h[:]) +} + +func (t *fsTree) treePath(addr *objectSDK.Address) string { + sAddr := stringifyAddress(addr) + + dirs := make([]string, 0, t.depth+1+1) // 1 for root, 1 for file + dirs = append(dirs, t.rootDir) + + for i := 0; i < t.depth; i++ { + dirs = append(dirs, sAddr[:t.dirNameLen]) + sAddr = sAddr[t.dirNameLen:] + } + + dirs = append(dirs, sAddr) + + return path.Join(dirs...) +} diff --git a/pkg/local_object_storage/blobstor/get.go b/pkg/local_object_storage/blobstor/get.go new file mode 100644 index 000000000..94826b50f --- /dev/null +++ b/pkg/local_object_storage/blobstor/get.go @@ -0,0 +1,83 @@ +package blobstor + +import ( + "io/ioutil" + "os" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" +) + +// GetPrm groups the parameters of Get operation. +type GetPrm struct { + addr *objectSDK.Address +} + +// GetRes groups resulting values of Get operation. +type GetRes struct { + obj *object.Object +} + +// ErrObjectNotFound is returns on read operations requested on a missing object. +var ErrObjectNotFound = errors.New("object not found") + +// WithAddress is a Get option to set the address of the requested object. +// +// Option is required. +func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm { + if p != nil { + p.addr = addr + } + + return p +} + +// Object returns the requested object. +func (r *GetRes) Object() *object.Object { + return r.obj +} + +// Get reads the object from BLOB storage. +// +// Returns any error encountered that +// did not allow to completely read the object part. +func (b *BlobStor) Get(prm *GetPrm) (*GetRes, error) { + b.mtx.RLock() + defer b.mtx.RUnlock() + + // get compressed object data + data, err := b.fsTree.get(prm.addr) + if err != nil { + if errors.Is(err, errFileNotFound) { + return nil, ErrObjectNotFound + } + + return nil, errors.Wrap(err, "could not read object from fs tree") + } + + data, err = b.decompressor(data) + if err != nil { + return nil, errors.Wrap(err, "could not decompress object data") + } + + // unmarshal the object + obj := object.New() + if err := obj.Unmarshal(data); err != nil { + return nil, errors.Wrap(err, "could not unmarshal the object") + } + + return &GetRes{ + obj: obj, + }, nil +} + +func (t *fsTree) get(addr *objectSDK.Address) ([]byte, error) { + p := t.treePath(addr) + + if _, err := os.Stat(p); os.IsNotExist(err) { + return nil, errFileNotFound + } + + return ioutil.ReadFile(p) +} diff --git a/pkg/local_object_storage/blobstor/getrange.go b/pkg/local_object_storage/blobstor/getrange.go new file mode 100644 index 000000000..021608eff --- /dev/null +++ b/pkg/local_object_storage/blobstor/getrange.go @@ -0,0 +1,82 @@ +package blobstor + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" +) + +// GetRangePrm groups the parameters of GetRange operation. +type GetRangePrm struct { + off, ln uint64 + + addr *objectSDK.Address +} + +// GetRangeRes groups resulting values of GetRange operation. +type GetRangeRes struct { + rngData []byte +} + +// WithAddress is a GetRange option to set the address of the requested object. +// +// Option is required. +func (p *GetRangePrm) WithAddress(addr *objectSDK.Address) *GetRangePrm { + if p != nil { + p.addr = addr + } + + return p +} + +// WithPayloadRange is a GetRange option to set range of requested payload data. +// +// Option is required. +func (p *GetRangePrm) WithPayloadRange(off, ln uint64) *GetRangePrm { + if p != nil { + p.off, p.ln = off, ln + } + + return p +} + +// RangeData returns data of the requested payload range. +func (r *GetRangeRes) RangeData() []byte { + return r.rngData +} + +// GetRange reads data of object payload range from BLOB storage. +// +// Returns any error encountered that +// did not allow to completely read the object payload range. +func (b *BlobStor) GetRange(prm *GetRangePrm) (*GetRangeRes, error) { + b.mtx.RLock() + defer b.mtx.RUnlock() + + // get compressed object data + data, err := b.fsTree.get(prm.addr) + if err != nil { + return nil, errors.Wrap(err, "could not read object from fs tree") + } + + data, err = b.decompressor(data) + if err != nil { + return nil, errors.Wrap(err, "could not decompress object data") + } + + // unmarshal the object + obj := object.New() + if err := obj.Unmarshal(data); err != nil { + return nil, errors.Wrap(err, "could not unmarshal the object") + } + + payload := obj.Payload() + if pLen := uint64(len(payload)); pLen < prm.ln+prm.off { + return nil, errors.Errorf("range is out-of-bounds (payload %d, off %d, ln %d)", + pLen, prm.off, prm.ln) + } + + return &GetRangeRes{ + rngData: payload[prm.off : prm.off+prm.ln], + }, nil +} diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go new file mode 100644 index 000000000..112988bda --- /dev/null +++ b/pkg/local_object_storage/blobstor/put.go @@ -0,0 +1,61 @@ +package blobstor + +import ( + "io/ioutil" + "os" + "path" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" +) + +// PutPrm groups the parameters of Put operation. +type PutPrm struct { + obj *object.Object +} + +// PutRes groups resulting values of Put operation. +type PutRes struct{} + +// WithObject is a Put option to set object to save. +// +// Option is required. +func (p *PutPrm) WithObject(obj *object.Object) *PutPrm { + if p != nil { + p.obj = obj + } + + return p +} + +// Put saves the object in BLOB storage. +// +// Returns any error encountered that +// did not allow to completely save the object. +func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + + // marshal object + data, err := prm.obj.Marshal() + if err != nil { + return nil, errors.Wrap(err, "could not marshal the object") + } + + // compress object data + data = b.compressor(data) + + // save object in fs tree + return nil, b.fsTree.put(prm.obj.Address(), data) +} + +func (t *fsTree) put(addr *objectSDK.Address, data []byte) error { + p := t.treePath(addr) + + if err := os.MkdirAll(path.Dir(p), t.perm); err != nil { + return err + } + + return ioutil.WriteFile(p, data, t.perm) +}