goodstor: initial implementation

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-04-11 20:01:55 +03:00
parent 884b77211c
commit 05996103ec
17 changed files with 728 additions and 0 deletions

View file

@ -0,0 +1,102 @@
package allocator
import (
"github.com/tidwall/btree"
)
// BestFit implements best-fit allocation strategy.
type BestFit struct {
capacity uint64
sizeToFree *btree.BTreeG[Region]
endToFree *btree.BTreeG[Region]
}
func lessBySize(a, b Region) bool {
if a.Length() == b.Length() {
return a.Offset() < b.Offset()
}
return a.Length() < b.Length()
}
func lessByEnd(a, b Region) bool {
return a.End() < b.End()
}
var _ Allocator = (*BestFit)(nil)
// NewBestFit returns new best-fit allocator instance.
func NewBestFit(capacity uint64) *BestFit {
a := &BestFit{
capacity: capacity,
sizeToFree: btree.NewBTreeGOptions(lessBySize, btree.Options{NoLocks: true}),
endToFree: btree.NewBTreeGOptions(lessByEnd, btree.Options{NoLocks: true}),
}
for i := uint64(0); i+maxLength < capacity; i += maxLength {
a.addFreePortion(NewRegion(i*maxLength, maxLength))
}
c := capacity / maxLength * maxLength
a.addFreePortion(NewRegion(c, capacity-c))
return a
}
// Get implements the Allocator interface.
func (b *BestFit) Get(size uint64) (Region, error) {
p := NewRegion(0, uint64(size))
iter := b.sizeToFree.Iter()
if iter.Seek(p) {
free := iter.Item()
b.deleteFreePortion(free)
reg := free.allocate(size)
if free.Length() != 0 {
b.addFreePortion(free)
}
return reg, nil
}
return 0, ErrOOM
}
// Free implements the Allocator interface.
func (b *BestFit) Free(region Region) error {
p := b.mergeIfPossible(region)
b.addFreePortion(p)
return nil
}
func (b *BestFit) mergeIfPossible(free Region) Region {
iter := b.endToFree.Iter()
key := NewRegion(free.Offset(), 0)
if iter.Seek(key) {
prev := iter.Item()
if prev.End() == free.Offset() && prev.safeExtend(free.Length()) {
free = NewRegion(prev.Offset(), prev.Length()+free.Length())
b.deleteFreePortion(prev)
}
}
key = NewRegion(free.End(), 0)
if iter.Seek(key) {
next := iter.Item()
if next.Offset() == free.End() && free.safeExtend(next.Length()) {
free = NewRegion(free.Offset(), free.Length()+next.Length())
b.deleteFreePortion(next)
}
}
iter.Release()
return free
}
func (b *BestFit) deleteFreePortion(p Region) {
b.sizeToFree.Delete(p)
b.endToFree.Delete(p)
}
func (b *BestFit) addFreePortion(p Region) {
b.sizeToFree.Set(p)
b.endToFree.Set(p)
}

View file

@ -0,0 +1,47 @@
package allocator
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestBestFit(t *testing.T) {
a := NewBestFit(4096)
testGet(t, a, 1024, NewRegion(0, 1024))
testGet(t, a, 1024, NewRegion(1024, 1024))
require.NoError(t, a.Free(NewRegion(0, 1024)))
testGet(t, a, 1024, NewRegion(0, 1024))
require.NoError(t, a.Free(NewRegion(0, 1024)))
testGet(t, a, 1025, NewRegion(2048, 1025))
testGet(t, a, 512, NewRegion(2048+1025, 512))
testGet(t, a, 512, NewRegion(0, 512))
testGet(t, a, 512, NewRegion(512, 512))
_, err := a.Get(512)
require.True(t, err == ErrOOM)
testGet(t, a, 511, NewRegion(2048+1025+512, 511))
require.NoError(t, a.Free(NewRegion(0, 1024)))
require.NoError(t, a.Free(NewRegion(2048, 2048)))
require.NoError(t, a.Free(NewRegion(1024, 1024)))
testGet(t, a, 4096, NewRegion(0, 4096))
}
func BenchmarkBestFit(b *testing.B) {
benchmarkAllocator(b, NewBestFit)
}
func benchmarkAllocator[A Allocator](b *testing.B, constructor func(uint64) A) {
b.Run("by one", func(b *testing.B) {
a := constructor(uint64(b.N))
for i := 0; i < b.N; i++ {
_, err := a.Get(1)
if err != nil {
b.Fatal(err)
}
}
})
}

View file

@ -0,0 +1,33 @@
package allocator
import (
"github.com/RoaringBitmap/roaring"
)
// Bitmap allocates chunks of a constant size.
type Bitmap struct {
free roaring.Bitmap
allocated roaring.Bitmap
}
func NewBitmap(capacity uint64) *Bitmap {
b := &Bitmap{}
b.free.AddRange(0, capacity)
return b
}
func (b *Bitmap) Get(size int) (Region, error) {
if size != 1 {
panic("not implemented")
}
r := b.free.Minimum()
b.free.Remove(r)
b.allocated.Add(r)
return NewRegion(uint64(r), uint64(size)), nil
}
func (b *Bitmap) Free(region Region) error {
b.allocated.RemoveRange(region.Offset(), region.End())
b.free.AddRange(region.Offset(), region.End())
return nil
}

View file

@ -0,0 +1 @@
package allocator

Binary file not shown.

View file

@ -0,0 +1,61 @@
package allocator
import (
"errors"
"fmt"
"math"
)
type Allocator interface {
Get(uint64) (Region, error)
Free(Region) error
}
var (
ErrOOM = errors.New("no free chunk is found")
)
// Region describes continuous range of data as offset and length.
// Offset is 40-bit, for 512-byte chunks this means it can contain 2^40 * 2^9 = 512 TiB of data.
// Length is 24-bit, for 512-byte chunks this means it can contain 2^24 * 2^9 = 8 GiB of data.
type Region uint64
const (
lenShift = 40
maxLength = math.MaxUint64 >> lenShift
offsetMask = (1 << lenShift) - 1
)
func NewRegion(offset, length uint64) Region {
return Region((length << lenShift) | offset)
}
func (r Region) Offset() uint64 {
return uint64(r & offsetMask)
}
func (r Region) Length() uint64 {
return uint64(r >> lenShift)
}
func (r Region) End() uint64 {
return r.Offset() + r.Length()
}
func (r *Region) safeExtend(size uint64) bool {
return r.Length()+size < maxLength
}
func (r *Region) allocate(size uint64) Region {
assert(size <= r.Length(), "invalid allocation size: have=%d, need=%d", r.Length(), size)
result := NewRegion(r.Offset(), size)
*r = NewRegion(r.Offset()+size, r.Length()-size)
return result
}
func assert(ok bool, msg string, args ...any) {
if !ok {
panic(fmt.Sprintf(msg, args...))
}
}

View file

@ -0,0 +1,13 @@
package allocator
import (
"testing"
"github.com/stretchr/testify/require"
)
func testGet(t *testing.T, a Allocator, size int, expected Region) {
r, err := a.Get(uint64(size))
require.NoError(t, err)
require.Equal(t, expected, r)
}

View file

@ -0,0 +1,65 @@
package goodstor
import (
"os"
"path/filepath"
"codeberg.org/fyrchik/uring/loop"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
)
const idToStringBase = 16
type nameIDPair struct {
name string
id uint16
}
func (s *Storage) Open(readOnly bool) error {
s.readOnly = readOnly
if s.loopSize != 0 {
lp, err := loop.New(s.loopSize, &s.loopParams)
if err != nil {
return err
}
s.loop = lp
}
if !readOnly {
if err := util.MkdirAllX(s.path, os.ModePerm); err != nil {
return err
}
}
p := filepath.Join(s.path, "backend")
b, err := s.openSlab(p)
if err != nil {
return err
}
s.backend = b
return nil
}
func (s *Storage) Init() error {
return nil
}
func (s *Storage) Close() error {
if s.backend != nil {
if err := s.backend.Close(); err != nil {
return err
}
s.backend = nil
}
if s.loop != nil {
if err := s.loop.Close(); err != nil {
return err
}
s.loop = nil
}
return nil
}

View file

@ -0,0 +1,27 @@
package goodstor
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/allocator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
)
func (s *Storage) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
if s.readOnly {
return common.DeleteRes{}, common.ErrReadOnly
}
if prm.StorageID == nil {
return common.DeleteRes{}, errNotImplemented
}
offset, length, err := parseStorageID(prm.StorageID)
if err != nil {
return common.DeleteRes{}, err
}
s.allocMtx.Lock()
r := allocator.NewRegion(offset, (length+s.blockSize-1)/s.blockSize)
err = s.allocator.Free(r)
s.allocMtx.Unlock()
return common.DeleteRes{}, err
}

View file

@ -0,0 +1,27 @@
package goodstor
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
)
func (s *Storage) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
//if prm.StorageID == nil {
return common.ExistsRes{}, errNotImplemented
//}
// r, err := parseStorageID(prm.StorageID)
// if err != nil {
// return common.ExistsRes{}, err
// }
// s.slabMtx.RLock()
// ss, ok := s.slabMap[slabID]
// s.slabMtx.RUnlock()
// if !ok {
// return common.ExistsRes{}, nil
// }
// exists, err := ss.Exists(offset)
// return common.ExistsRes{Exists: exists}, err
}

View file

@ -0,0 +1,72 @@
package goodstor
import (
"os"
"path/filepath"
"strconv"
"testing"
"time"
"codeberg.org/fyrchik/uring/loop"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestGeneric(t *testing.T) {
_ = os.RemoveAll(t.Name())
defer func() { _ = os.RemoveAll(t.Name()) }()
helper := func(t *testing.T, dir string) common.Storage {
s, err := New(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithRootPath(dir),
WithBlockSize(8192),
WithCapacity(1024*1024),
WithLoopParams(32, loop.Params{
RingCount: 1,
SubmissionTimer: 10 * time.Microsecond,
}))
require.NoError(t, err)
return s
}
var n int
newUringStor := func(t *testing.T) common.Storage {
dir := filepath.Join(t.Name(), strconv.Itoa(n))
return helper(t, dir)
}
min := uint64(1024)
max := uint64(4096)
blobstortest.TestAll(t, newUringStor, min, max)
t.Run("info", func(t *testing.T) {
dir := filepath.Join(t.Name(), "info")
blobstortest.TestInfo(t, func(t *testing.T) common.Storage {
return helper(t, dir)
}, Type, dir)
})
}
func TestControl(t *testing.T) {
_ = os.RemoveAll(t.Name())
defer func() { _ = os.RemoveAll(t.Name()) }()
var n int
newUringStor := func(t *testing.T) common.Storage {
dir := filepath.Join(t.Name(), strconv.Itoa(n))
s, err := New(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithRootPath(dir),
WithBlockSize(4096),
WithCapacity(1024*1024),
WithLoopParams(32, loop.Params{}))
require.NoError(t, err)
return s
}
blobstortest.TestControl(t, newUringStor, 1024, 2048)
}

View file

@ -0,0 +1,34 @@
package goodstor
import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
var errInvalidStorageID = errors.New("invalid storage ID")
func (s *Storage) Get(prm common.GetPrm) (common.GetRes, error) {
if prm.StorageID == nil {
return common.GetRes{}, errNotImplemented
}
offset, length, err := parseStorageID(prm.StorageID)
if err != nil {
return common.GetRes{}, err
}
data := make([]byte, length)
err = s.backend.ReadAt(data, int64(offset*s.blockSize))
if err != nil {
return common.GetRes{}, err
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, err
}
return common.GetRes{Object: obj, RawData: data}, nil
}

View file

@ -0,0 +1,29 @@
package goodstor
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
func (s *Storage) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := s.Get(common.GetPrm{
Address: prm.Address,
StorageID: prm.StorageID,
})
if err != nil {
return common.GetRangeRes{}, err
}
payload := res.Object.Payload()
from := prm.Range.GetOffset()
to := from + prm.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{})
}
return common.GetRangeRes{
Data: payload[from:to],
}, nil
}

View file

@ -0,0 +1,7 @@
package goodstor
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
func (s *Storage) Iterate(common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, errNotImplemented
}

View file

@ -0,0 +1,35 @@
package goodstor
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
)
func (s *Storage) Put(prm common.PutPrm) (common.PutRes, error) {
if s.readOnly {
return common.PutRes{}, common.ErrReadOnly
}
if prm.RawData == nil {
panic("unexpected")
}
up2 := s.blockSize
for ; up2 <= uint64(len(prm.RawData)); up2 *= 2 {
}
s.allocMtx.Lock()
r, err := s.allocator.Get(up2 / s.blockSize)
s.allocMtx.Unlock()
if err != nil {
return common.PutRes{}, err
}
err = s.backend.WriteAt(prm.RawData, int64(r.Offset()*s.blockSize))
if err != nil {
s.allocMtx.Lock()
_ = s.allocator.Free(r)
s.allocMtx.Unlock()
}
storageID := marshalStorageID(r.Offset(), uint64(len(prm.RawData)))
return common.PutRes{StorageID: storageID[:]}, nil
}

View file

@ -0,0 +1,117 @@
package goodstor
import (
"sync"
"codeberg.org/fyrchik/uring/loop"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/allocator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/uringstor/slab"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
var _ common.Storage = (*Storage)(nil)
type Option = func(*Storage)
type Storage struct {
capacity uint64
path string
blockSize uint64
noSync bool
readOnly bool
loopSize int
loopParams loop.Params
loop *loop.Loop
log *logger.Logger
backend slab.Backend
allocMtx sync.Mutex
allocator allocator.Allocator
}
const (
defaultBlockSize = 512
defaultUringLoopSize = 1024
)
func New(opts ...Option) (*Storage, error) {
var s Storage
s.blockSize = defaultBlockSize
s.loopSize = defaultUringLoopSize
s.log = &logger.Logger{Logger: zap.NewNop()}
for i := range opts {
opts[i](&s)
}
s.allocator = allocator.NewBestFit(s.capacity / s.blockSize)
return &s, nil
}
// Type is uring storage type used in logs and configuration.
const Type = "goodstor"
// Type implements the common.Storage interface.
func (s *Storage) Type() string {
return Type
}
// Path implements the common.Storage interface.
func (s *Storage) Path() string {
return s.path
}
// SetCompressor implements the common.Storage interface.
func (s *Storage) SetCompressor(cc *compression.Config) {}
// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
func (s *Storage) SetReportErrorFunc(f func(string, error)) {}
// WithCapacity sets the max capacity of the storage.
func WithCapacity(capacity uint64) Option {
return func(s *Storage) {
s.capacity = capacity
}
}
// WithRootPath sets the max capacity of the storage.
func WithRootPath(dir string) Option {
return func(s *Storage) {
s.path = dir
}
}
func WithLoopParams(size int, p loop.Params) Option {
return func(s *Storage) {
s.loopSize = size
s.loopParams = p
}
}
func WithOSBackend() Option {
return func(s *Storage) {
s.loopSize = 0
}
}
func WithBlockSize(size uint64) Option {
return func(s *Storage) {
s.blockSize = uint64(size)
}
}
func WithLogger(l *logger.Logger) Option {
return func(s *Storage) {
s.log = &logger.Logger{Logger: l.With(zap.String("component", Type))}
}
}
func WithNoSync(noSync bool) Option {
return func(s *Storage) {
s.noSync = noSync
}
}

View file

@ -0,0 +1,58 @@
package goodstor
import (
"encoding/binary"
"fmt"
"os"
"syscall"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/uringstor/slab"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
var errNotImplemented = logicerr.Wrap(fmt.Errorf("not implemented: %w", apistatus.ObjectNotFound{}))
func parseStorageID(storageID []byte) (uint64, uint64, error) {
if len(storageID) != 10 {
return 0, 0, errInvalidStorageID
}
offset := binary.LittleEndian.Uint64(storageID[:8])
length := binary.LittleEndian.Uint16(storageID[8:])
return offset, uint64(length), nil
}
func marshalStorageID(offset, length uint64) [10]byte {
var storageID [10]byte
binary.LittleEndian.PutUint64(storageID[:], offset)
binary.LittleEndian.PutUint16(storageID[8:], uint16(length))
return storageID
}
func assert(ok bool, msg string, args ...any) {
if !ok {
panic(fmt.Sprintf(msg, args...))
}
}
func (s *Storage) openSlab(p string) (slab.Backend, error) {
var b slab.Backend
if s.loop == nil {
b = slab.NewFileBackend(p)
} else {
b = slab.NewUringBackend(p, s.loop)
}
var flags int
if s.readOnly {
flags |= os.O_RDONLY
} else {
flags |= os.O_RDWR | os.O_CREATE | syscall.O_DIRECT
}
if !s.noSync {
flags |= os.O_SYNC
}
return b, b.Open(flags)
}