[#176] localstore: Implement primary BlobStor

Implement primary local BLOB storage based on filesystem tree.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
remotes/KirillovDenis/release/v0.21.1
Leonard Lyubich 2020-11-17 19:29:00 +03:00 committed by Alex Vanin
parent 09750484f9
commit b127607ac6
9 changed files with 467 additions and 0 deletions

1
go.mod
View File

@ -8,6 +8,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/klauspost/compress v1.11.3
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.1.3
github.com/multiformats/go-multiaddr v0.2.0

2
go.sum
View File

@ -207,6 +207,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=

View File

@ -0,0 +1,100 @@
package blobstor
import (
"encoding/hex"
"os"
"sync"
)
// BlobStor represents NeoFS local BLOB storage.
type BlobStor struct {
*cfg
mtx *sync.RWMutex
}
// Option represents BlobStor's constructor option.
type Option func(*cfg)
type cfg struct {
fsTree fsTree
compressor func([]byte) []byte
decompressor func([]byte) ([]byte, error)
}
const (
defaultShallowDepth = 4
defaultPerm = 0700
)
func defaultCfg() *cfg {
return &cfg{
fsTree: fsTree{
depth: defaultShallowDepth,
dirNameLen: hex.EncodedLen(dirNameLen),
perm: defaultPerm,
rootDir: "./",
},
compressor: noOpCompressor,
decompressor: noOpDecompressor,
}
}
// New creates, initializes and returns new BlobStor instance.
func New(opts ...Option) *BlobStor {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &BlobStor{
cfg: c,
}
}
// WithShallowDepth returns option to set the
// depth of the object file subdirectory tree.
//
// Depth is reduced to maximum value in case of overflow.
func WithShallowDepth(depth int) Option {
return func(c *cfg) {
if depth <= maxDepth {
depth = maxDepth
}
c.fsTree.depth = depth
}
}
// WithCompressObjects returns option to toggle
// compression of the stored objects.
func WithCompressObjects(comp bool) Option {
return func(c *cfg) {
if comp {
c.compressor = zstdCompressor()
c.decompressor = zstdDecompressor()
} else {
c.compressor = noOpCompressor
c.decompressor = noOpDecompressor
}
}
}
// WithTreeRootPath returns option to set path to root directory
// of the fs tree to write the objects.
func WithTreeRootPath(rootDir string) Option {
return func(c *cfg) {
c.fsTree.rootDir = rootDir
}
}
// WithTreeRootPerm returns option to set permission
// bits of the fs tree.
func WithTreeRootPerm(perm os.FileMode) Option {
return func(c *cfg) {
c.fsTree.perm = perm
}
}

View File

@ -0,0 +1,35 @@
package blobstor
import (
"github.com/klauspost/compress/zstd"
)
func noOpCompressor(data []byte) []byte {
return data
}
func noOpDecompressor(data []byte) ([]byte, error) {
return data, nil
}
func zstdCompressor() func([]byte) []byte {
enc, err := zstd.NewWriter(nil)
if err != nil {
panic(err)
}
return func(data []byte) []byte {
return enc.EncodeAll(data, make([]byte, 0, len(data)))
}
}
func zstdDecompressor() func([]byte) ([]byte, error) {
dec, err := zstd.NewReader(nil)
if err != nil {
panic(err)
}
return func(data []byte) ([]byte, error) {
return dec.DecodeAll(data, nil)
}
}

View File

@ -0,0 +1,54 @@
package blobstor
import (
"os"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/pkg/errors"
)
// DeletePrm groups the parameters of Delete operation.
type DeletePrm struct {
addr *objectSDK.Address
}
// DeleteRes groups resulting values of Delete operation.
type DeleteRes struct{}
// WithAddress is a Delete option to set the address of the object to delete.
//
// Option is required.
func (p *DeletePrm) WithAddress(addr *objectSDK.Address) *DeletePrm {
if p != nil {
p.addr = addr
}
return p
}
// Delete removes object from BLOB storage.
//
// Returns any error encountered that did not allow
// to completely remove the object.
func (b *BlobStor) Delete(prm *DeletePrm) (*DeleteRes, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
err := b.fsTree.delete(prm.addr)
if errors.Is(err, errFileNotFound) {
err = nil
}
return nil, err
}
func (t *fsTree) delete(addr *object.Address) error {
p := t.treePath(addr)
if _, err := os.Stat(p); os.IsNotExist(err) {
return errFileNotFound
}
return os.Remove(p)
}

View File

@ -0,0 +1,49 @@
package blobstor
import (
"crypto/sha256"
"encoding/hex"
"errors"
"os"
"path"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type fsTree struct {
depth int
dirNameLen int
perm os.FileMode
rootDir string
}
const dirNameLen = 2 // in bytes
var maxDepth = (hex.EncodedLen(sha256.Size) - 1) / dirNameLen
var errFileNotFound = errors.New("file not found")
func stringifyAddress(addr *objectSDK.Address) string {
h := sha256.Sum256([]byte(addr.String()))
return hex.EncodeToString(h[:])
}
func (t *fsTree) treePath(addr *objectSDK.Address) string {
sAddr := stringifyAddress(addr)
dirs := make([]string, 0, t.depth+1+1) // 1 for root, 1 for file
dirs = append(dirs, t.rootDir)
for i := 0; i < t.depth; i++ {
dirs = append(dirs, sAddr[:t.dirNameLen])
sAddr = sAddr[t.dirNameLen:]
}
dirs = append(dirs, sAddr)
return path.Join(dirs...)
}

View File

@ -0,0 +1,83 @@
package blobstor
import (
"io/ioutil"
"os"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
)
// GetPrm groups the parameters of Get operation.
type GetPrm struct {
addr *objectSDK.Address
}
// GetRes groups resulting values of Get operation.
type GetRes struct {
obj *object.Object
}
// ErrObjectNotFound is returns on read operations requested on a missing object.
var ErrObjectNotFound = errors.New("object not found")
// WithAddress is a Get option to set the address of the requested object.
//
// Option is required.
func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
if p != nil {
p.addr = addr
}
return p
}
// Object returns the requested object.
func (r *GetRes) Object() *object.Object {
return r.obj
}
// Get reads the object from BLOB storage.
//
// Returns any error encountered that
// did not allow to completely read the object part.
func (b *BlobStor) Get(prm *GetPrm) (*GetRes, error) {
b.mtx.RLock()
defer b.mtx.RUnlock()
// get compressed object data
data, err := b.fsTree.get(prm.addr)
if err != nil {
if errors.Is(err, errFileNotFound) {
return nil, ErrObjectNotFound
}
return nil, errors.Wrap(err, "could not read object from fs tree")
}
data, err = b.decompressor(data)
if err != nil {
return nil, errors.Wrap(err, "could not decompress object data")
}
// unmarshal the object
obj := object.New()
if err := obj.Unmarshal(data); err != nil {
return nil, errors.Wrap(err, "could not unmarshal the object")
}
return &GetRes{
obj: obj,
}, nil
}
func (t *fsTree) get(addr *objectSDK.Address) ([]byte, error) {
p := t.treePath(addr)
if _, err := os.Stat(p); os.IsNotExist(err) {
return nil, errFileNotFound
}
return ioutil.ReadFile(p)
}

View File

@ -0,0 +1,82 @@
package blobstor
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
)
// GetRangePrm groups the parameters of GetRange operation.
type GetRangePrm struct {
off, ln uint64
addr *objectSDK.Address
}
// GetRangeRes groups resulting values of GetRange operation.
type GetRangeRes struct {
rngData []byte
}
// WithAddress is a GetRange option to set the address of the requested object.
//
// Option is required.
func (p *GetRangePrm) WithAddress(addr *objectSDK.Address) *GetRangePrm {
if p != nil {
p.addr = addr
}
return p
}
// WithPayloadRange is a GetRange option to set range of requested payload data.
//
// Option is required.
func (p *GetRangePrm) WithPayloadRange(off, ln uint64) *GetRangePrm {
if p != nil {
p.off, p.ln = off, ln
}
return p
}
// RangeData returns data of the requested payload range.
func (r *GetRangeRes) RangeData() []byte {
return r.rngData
}
// GetRange reads data of object payload range from BLOB storage.
//
// Returns any error encountered that
// did not allow to completely read the object payload range.
func (b *BlobStor) GetRange(prm *GetRangePrm) (*GetRangeRes, error) {
b.mtx.RLock()
defer b.mtx.RUnlock()
// get compressed object data
data, err := b.fsTree.get(prm.addr)
if err != nil {
return nil, errors.Wrap(err, "could not read object from fs tree")
}
data, err = b.decompressor(data)
if err != nil {
return nil, errors.Wrap(err, "could not decompress object data")
}
// unmarshal the object
obj := object.New()
if err := obj.Unmarshal(data); err != nil {
return nil, errors.Wrap(err, "could not unmarshal the object")
}
payload := obj.Payload()
if pLen := uint64(len(payload)); pLen < prm.ln+prm.off {
return nil, errors.Errorf("range is out-of-bounds (payload %d, off %d, ln %d)",
pLen, prm.off, prm.ln)
}
return &GetRangeRes{
rngData: payload[prm.off : prm.off+prm.ln],
}, nil
}

View File

@ -0,0 +1,61 @@
package blobstor
import (
"io/ioutil"
"os"
"path"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
obj *object.Object
}
// PutRes groups resulting values of Put operation.
type PutRes struct{}
// WithObject is a Put option to set object to save.
//
// Option is required.
func (p *PutPrm) WithObject(obj *object.Object) *PutPrm {
if p != nil {
p.obj = obj
}
return p
}
// Put saves the object in BLOB storage.
//
// Returns any error encountered that
// did not allow to completely save the object.
func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
// marshal object
data, err := prm.obj.Marshal()
if err != nil {
return nil, errors.Wrap(err, "could not marshal the object")
}
// compress object data
data = b.compressor(data)
// save object in fs tree
return nil, b.fsTree.put(prm.obj.Address(), data)
}
func (t *fsTree) put(addr *objectSDK.Address, data []byte) error {
p := t.treePath(addr)
if err := os.MkdirAll(path.Dir(p), t.perm); err != nil {
return err
}
return ioutil.WriteFile(p, data, t.perm)
}