forked from TrueCloudLab/frostfs-node
WIP
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
05996103ec
commit
dbccc429cf
21 changed files with 239 additions and 63 deletions
5
go.mod
5
go.mod
|
@ -44,7 +44,10 @@ require (
|
|||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require golang.org/x/sys v0.6.0
|
||||
require (
|
||||
github.com/tidwall/btree v1.6.0
|
||||
golang.org/x/sys v0.6.0
|
||||
)
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -430,6 +430,8 @@ github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8
|
|||
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
|
||||
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
|
||||
github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
|
||||
github.com/twmb/murmur3 v1.1.5 h1:i9OLS9fkuLzBXjt6dptlAEyk58fJsSTXbRg3SgVyqgk=
|
||||
github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU=
|
||||
|
|
|
@ -34,7 +34,6 @@ func (b *Blobovnicza) Open() error {
|
|||
)
|
||||
|
||||
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -35,17 +35,29 @@ func NewBestFit(capacity uint64) *BestFit {
|
|||
a.addFreePortion(NewRegion(i*maxLength, maxLength))
|
||||
}
|
||||
c := capacity / maxLength * maxLength
|
||||
a.addFreePortion(NewRegion(c, capacity-c))
|
||||
r := NewRegion(c, capacity-c)
|
||||
a.addFreePortion(NewRegion(0, 0))
|
||||
a.addFreePortion(r)
|
||||
return a
|
||||
}
|
||||
|
||||
func seek(b *btree.BTreeG[Region], pivot Region) (Region, bool) {
|
||||
var item Region
|
||||
var seen bool
|
||||
b.Ascend(pivot, func(it Region) bool {
|
||||
item = it
|
||||
seen = true
|
||||
return false
|
||||
})
|
||||
return item, seen
|
||||
}
|
||||
|
||||
// Get implements the Allocator interface.
|
||||
func (b *BestFit) Get(size uint64) (Region, error) {
|
||||
p := NewRegion(0, uint64(size))
|
||||
iter := b.sizeToFree.Iter()
|
||||
if iter.Seek(p) {
|
||||
free := iter.Item()
|
||||
|
||||
free, ok := seek(b.sizeToFree, p)
|
||||
if ok {
|
||||
b.deleteFreePortion(free)
|
||||
|
||||
reg := free.allocate(size)
|
||||
|
@ -54,7 +66,6 @@ func (b *BestFit) Get(size uint64) (Region, error) {
|
|||
}
|
||||
return reg, nil
|
||||
}
|
||||
|
||||
return 0, ErrOOM
|
||||
}
|
||||
|
||||
|
@ -66,11 +77,9 @@ func (b *BestFit) Free(region Region) error {
|
|||
}
|
||||
|
||||
func (b *BestFit) mergeIfPossible(free Region) Region {
|
||||
iter := b.endToFree.Iter()
|
||||
|
||||
key := NewRegion(free.Offset(), 0)
|
||||
if iter.Seek(key) {
|
||||
prev := iter.Item()
|
||||
prev, ok := seek(b.endToFree, key)
|
||||
if ok {
|
||||
if prev.End() == free.Offset() && prev.safeExtend(free.Length()) {
|
||||
free = NewRegion(prev.Offset(), prev.Length()+free.Length())
|
||||
b.deleteFreePortion(prev)
|
||||
|
@ -78,16 +87,14 @@ func (b *BestFit) mergeIfPossible(free Region) Region {
|
|||
}
|
||||
|
||||
key = NewRegion(free.End(), 0)
|
||||
if iter.Seek(key) {
|
||||
next := iter.Item()
|
||||
next, ok := seek(b.endToFree, key)
|
||||
if ok {
|
||||
if next.Offset() == free.End() && free.safeExtend(next.Length()) {
|
||||
free = NewRegion(free.Offset(), free.Length()+next.Length())
|
||||
b.deleteFreePortion(next)
|
||||
}
|
||||
}
|
||||
|
||||
iter.Release()
|
||||
|
||||
return free
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,14 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBestFit1(t *testing.T) {
|
||||
a := NewBestFit(4194304)
|
||||
testGet(t, a, 1, NewRegion(0, 1))
|
||||
testGet(t, a, 1, NewRegion(1, 1))
|
||||
testGet(t, a, 1, NewRegion(2, 1))
|
||||
testGet(t, a, 1, NewRegion(3, 1))
|
||||
}
|
||||
|
||||
func TestBestFit(t *testing.T) {
|
||||
a := NewBestFit(4096)
|
||||
testGet(t, a, 1024, NewRegion(0, 1024))
|
||||
|
|
|
@ -37,6 +37,9 @@ func (s *Storage) Open(readOnly bool) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Fallocate(int64(s.capacity)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.backend = b
|
||||
return nil
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package goodstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
)
|
||||
|
||||
func (s *Storage) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
func (s *Storage) Exists(_ context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
//if prm.StorageID == nil {
|
||||
return common.ExistsRes{}, errNotImplemented
|
||||
//}
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -20,7 +19,7 @@ func TestGeneric(t *testing.T) {
|
|||
defer func() { _ = os.RemoveAll(t.Name()) }()
|
||||
|
||||
helper := func(t *testing.T, dir string) common.Storage {
|
||||
s, err := New(
|
||||
return New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithRootPath(dir),
|
||||
WithBlockSize(8192),
|
||||
|
@ -29,8 +28,6 @@ func TestGeneric(t *testing.T) {
|
|||
RingCount: 1,
|
||||
SubmissionTimer: 10 * time.Microsecond,
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}
|
||||
|
||||
var n int
|
||||
|
@ -58,14 +55,15 @@ func TestControl(t *testing.T) {
|
|||
var n int
|
||||
newUringStor := func(t *testing.T) common.Storage {
|
||||
dir := filepath.Join(t.Name(), strconv.Itoa(n))
|
||||
s, err := New(
|
||||
return New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithRootPath(dir),
|
||||
WithBlockSize(4096),
|
||||
WithCapacity(1024*1024),
|
||||
WithLoopParams(32, loop.Params{}))
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
WithLoopParams(32, loop.Params{
|
||||
RingCount: 1,
|
||||
SubmissionTimer: 10 * time.Microsecond,
|
||||
}))
|
||||
}
|
||||
|
||||
blobstortest.TestControl(t, newUringStor, 1024, 2048)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package goodstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -9,9 +10,9 @@ import (
|
|||
|
||||
var errInvalidStorageID = errors.New("invalid storage ID")
|
||||
|
||||
func (s *Storage) Get(prm common.GetPrm) (common.GetRes, error) {
|
||||
func (s *Storage) Get(_ context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
if prm.StorageID == nil {
|
||||
return common.GetRes{}, errNotImplemented
|
||||
return common.GetRes{}, nil
|
||||
}
|
||||
|
||||
offset, length, err := parseStorageID(prm.StorageID)
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
package goodstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
func (s *Storage) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
res, err := s.Get(common.GetPrm{
|
||||
func (s *Storage) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
res, err := s.Get(ctx, common.GetPrm{
|
||||
Address: prm.Address,
|
||||
StorageID: prm.StorageID,
|
||||
})
|
||||
|
|
|
@ -38,7 +38,7 @@ const (
|
|||
defaultUringLoopSize = 1024
|
||||
)
|
||||
|
||||
func New(opts ...Option) (*Storage, error) {
|
||||
func New(opts ...Option) *Storage {
|
||||
var s Storage
|
||||
s.blockSize = defaultBlockSize
|
||||
s.loopSize = defaultUringLoopSize
|
||||
|
@ -48,7 +48,7 @@ func New(opts ...Option) (*Storage, error) {
|
|||
}
|
||||
|
||||
s.allocator = allocator.NewBestFit(s.capacity / s.blockSize)
|
||||
return &s, nil
|
||||
return &s
|
||||
}
|
||||
|
||||
// Type is uring storage type used in logs and configuration.
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/uringstor/slab"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -48,7 +47,7 @@ func (s *Storage) openSlab(p string) (slab.Backend, error) {
|
|||
if s.readOnly {
|
||||
flags |= os.O_RDONLY
|
||||
} else {
|
||||
flags |= os.O_RDWR | os.O_CREATE | syscall.O_DIRECT
|
||||
flags |= os.O_RDWR | os.O_CREATE
|
||||
}
|
||||
|
||||
if !s.noSync {
|
||||
|
|
|
@ -4,13 +4,20 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"codeberg.org/fyrchik/uring"
|
||||
"codeberg.org/fyrchik/uring/loop"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/goodstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/uringstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -21,7 +28,7 @@ type storage struct {
|
|||
}
|
||||
|
||||
func (s storage) open(b *testing.B) common.Storage {
|
||||
dir, err := os.MkdirTemp(os.TempDir(), s.desc)
|
||||
dir, err := os.MkdirTemp(".", s.desc)
|
||||
if err != nil {
|
||||
b.Fatalf("creating %s root path: %v", s.desc, err)
|
||||
}
|
||||
|
@ -73,6 +80,133 @@ var storages = []storage{
|
|||
create: func(dir string) common.Storage {
|
||||
return blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(dir),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(2),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(2),
|
||||
blobovniczatree.WithBlobovniczaSize(1<<30),
|
||||
)
|
||||
},
|
||||
},
|
||||
// {
|
||||
// desc: "uringstor_nosync",
|
||||
// create: func(dir string) common.Storage {
|
||||
// return uringstor.New(
|
||||
// uringstor.WithRootPath(dir),
|
||||
// uringstor.WithSlabCapacity(1<<30),
|
||||
// uringstor.WithLoopParams(64, loop.Params{
|
||||
// //WaitEventFd: true,
|
||||
// Flags: loop.FlagSharedWorkers,
|
||||
// RingCount: 4,
|
||||
// SubmissionTimer: 10 * time.Microsecond,
|
||||
// RingParams: uring.Params{
|
||||
// //Flags: uring.IORING_SETUP_ATTACH_WQ,
|
||||
// },
|
||||
// }),
|
||||
// uringstor.WithNoSync(true),
|
||||
// )
|
||||
// },
|
||||
// },
|
||||
// {
|
||||
// desc: "uringstor_nosync OS",
|
||||
// create: func(dir string) common.Storage {
|
||||
// return uringstor.New(
|
||||
// uringstor.WithRootPath(dir),
|
||||
// uringstor.WithSlabCapacity(1<<30),
|
||||
// uringstor.WithOSBackend(),
|
||||
// uringstor.WithNoSync(true),
|
||||
// )
|
||||
// },
|
||||
// },
|
||||
// {
|
||||
// desc: "goodstor_nosync",
|
||||
// create: func(dir string) common.Storage {
|
||||
// return goodstor.New(
|
||||
// goodstor.WithRootPath(dir),
|
||||
// goodstor.WithCapacity(16<<30),
|
||||
// goodstor.WithLoopParams(128, loop.Params{
|
||||
// //WaitEventFd: true,
|
||||
// RingCount: runtime.NumCPU(),
|
||||
// Flags: loop.FlagSharedWorkers,
|
||||
// SubmissionTimer: 50 * time.Microsecond,
|
||||
// RingParams: uring.Params{
|
||||
// CQEntries: 2 * 128,
|
||||
// Flags: uring.IORING_SETUP_CQSIZE,
|
||||
// },
|
||||
// }),
|
||||
// goodstor.WithBlockSize(4096),
|
||||
// goodstor.WithNoSync(true),
|
||||
// )
|
||||
// },
|
||||
// },
|
||||
{
|
||||
desc: "goodstor_nosync OS",
|
||||
create: func(dir string) common.Storage {
|
||||
return goodstor.New(
|
||||
goodstor.WithRootPath(dir),
|
||||
goodstor.WithCapacity(16<<30),
|
||||
goodstor.WithBlockSize(4096),
|
||||
goodstor.WithOSBackend(),
|
||||
goodstor.WithNoSync(true),
|
||||
)
|
||||
},
|
||||
},
|
||||
// {
|
||||
// desc: "uringstor",
|
||||
// create: func(dir string) common.Storage {
|
||||
// return uringstor.New(
|
||||
// uringstor.WithRootPath(dir),
|
||||
// uringstor.WithSlabCapacity(1<<30),
|
||||
// uringstor.WithLoopParams(32, loop.Params{
|
||||
// WaitEventFd: true,
|
||||
// RingCount: 4,
|
||||
// Flags: loop.FlagSharedWorkers,
|
||||
// SubmissionTimer: 10 * time.Microsecond,
|
||||
// RingParams: uring.Params{
|
||||
// CQEntries: 2 * 32,
|
||||
// Flags: uring.IORING_SETUP_CQSIZE,
|
||||
// },
|
||||
// }),
|
||||
// )
|
||||
// },
|
||||
// },
|
||||
{
|
||||
desc: "uringstor OS",
|
||||
create: func(dir string) common.Storage {
|
||||
return uringstor.New(
|
||||
uringstor.WithRootPath(dir),
|
||||
uringstor.WithMaxObjectSize(16*1024*1024),
|
||||
uringstor.WithSlabCapacity(1<<30),
|
||||
uringstor.WithOSBackend(),
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "goodstor",
|
||||
create: func(dir string) common.Storage {
|
||||
return goodstor.New(
|
||||
goodstor.WithRootPath(dir),
|
||||
goodstor.WithCapacity(16<<30),
|
||||
goodstor.WithLoopParams(32, loop.Params{
|
||||
WaitEventFd: true,
|
||||
RingCount: 4,
|
||||
Flags: loop.FlagSharedWorkers,
|
||||
SubmissionTimer: 10 * time.Microsecond,
|
||||
RingParams: uring.Params{
|
||||
CQEntries: 2 * 32,
|
||||
Flags: uring.IORING_SETUP_CQSIZE,
|
||||
},
|
||||
}),
|
||||
goodstor.WithBlockSize(4096),
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "goodstor OS",
|
||||
create: func(dir string) common.Storage {
|
||||
return goodstor.New(
|
||||
goodstor.WithRootPath(dir),
|
||||
goodstor.WithCapacity(16<<30),
|
||||
goodstor.WithBlockSize(4096),
|
||||
goodstor.WithOSBackend(),
|
||||
)
|
||||
},
|
||||
},
|
||||
|
@ -104,6 +238,8 @@ func BenchmarkSubstorageReadPerf(b *testing.B) {
|
|||
objGen := tt.objGen()
|
||||
st := stEntry.open(b)
|
||||
|
||||
var mtx sync.RWMutex
|
||||
var sid = make(map[oid.Address][]byte, tt.size)
|
||||
// Fill database
|
||||
var errG errgroup.Group
|
||||
for i := 0; i < tt.size; i++ {
|
||||
|
@ -114,10 +250,15 @@ func BenchmarkSubstorageReadPerf(b *testing.B) {
|
|||
if err != nil {
|
||||
return fmt.Errorf("marshal: %v", err)
|
||||
}
|
||||
_, err = st.Put(common.PutPrm{
|
||||
res, err := st.Put(common.PutPrm{
|
||||
Address: addr,
|
||||
RawData: raw,
|
||||
})
|
||||
if err == nil {
|
||||
mtx.Lock()
|
||||
sid[addr] = res.StorageID
|
||||
mtx.Unlock()
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
@ -128,7 +269,11 @@ func BenchmarkSubstorageReadPerf(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_, err := st.Get(context.Background(), common.GetPrm{Address: addrGen.Next()})
|
||||
addr := addrGen.Next()
|
||||
mtx.RLock()
|
||||
s := sid[addr]
|
||||
mtx.RUnlock()
|
||||
_, err := st.Get(context.Background(), common.GetPrm{Address: addr, StorageID: s})
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
|
@ -145,11 +290,13 @@ func BenchmarkSubstorageWritePerf(b *testing.B) {
|
|||
{desc: "rand10", create: func() testutil.ObjectGenerator { return &testutil.RandObjGenerator{ObjSize: 10} }},
|
||||
{desc: "rand100", create: func() testutil.ObjectGenerator { return &testutil.RandObjGenerator{ObjSize: 100} }},
|
||||
{desc: "rand1000", create: func() testutil.ObjectGenerator { return &testutil.RandObjGenerator{ObjSize: 1000} }},
|
||||
{desc: "overwrite10", create: func() testutil.ObjectGenerator { return &testutil.OverwriteObjGenerator{ObjSize: 10, MaxObjects: 100} }},
|
||||
{desc: "overwrite100", create: func() testutil.ObjectGenerator { return &testutil.OverwriteObjGenerator{ObjSize: 100, MaxObjects: 100} }},
|
||||
{desc: "overwrite1000", create: func() testutil.ObjectGenerator {
|
||||
return &testutil.OverwriteObjGenerator{ObjSize: 1000, MaxObjects: 100}
|
||||
}},
|
||||
{desc: "rand8k", create: func() testutil.ObjectGenerator { return &testutil.RandObjGenerator{ObjSize: 8000} }},
|
||||
{desc: "rand1M", create: func() testutil.ObjectGenerator { return &testutil.RandObjGenerator{ObjSize: 1 << 20} }},
|
||||
// {desc: "overwrite10", create: func() testutil.ObjectGenerator { return &testutil.OverwriteObjGenerator{ObjSize: 10, MaxObjects: 100} }},
|
||||
// {desc: "overwrite100", create: func() testutil.ObjectGenerator { return &testutil.OverwriteObjGenerator{ObjSize: 100, MaxObjects: 100} }},
|
||||
// {desc: "overwrite1000", create: func() testutil.ObjectGenerator {
|
||||
// return &testutil.OverwriteObjGenerator{ObjSize: 1000, MaxObjects: 100}
|
||||
// }},
|
||||
}
|
||||
|
||||
for _, genEntry := range generators {
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package uringstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
)
|
||||
|
||||
func (s *Storage) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
func (s *Storage) Exists(_ context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
if prm.StorageID == nil {
|
||||
return common.ExistsRes{}, errNotImplemented
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -19,14 +18,12 @@ func TestGeneric(t *testing.T) {
|
|||
defer func() { _ = os.RemoveAll(t.Name()) }()
|
||||
|
||||
helper := func(t *testing.T, dir string) common.Storage {
|
||||
s, err := New(
|
||||
return New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithRootPath(dir),
|
||||
WithMaxObjectSize(8192),
|
||||
WithSlabCapacity(1024*1024),
|
||||
WithLoopParams(32, loop.Params{}))
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
WithLoopParams(32, loop.Params{RingCount: 1}))
|
||||
}
|
||||
|
||||
var n int
|
||||
|
@ -54,14 +51,12 @@ func TestControl(t *testing.T) {
|
|||
var n int
|
||||
newUringStor := func(t *testing.T) common.Storage {
|
||||
dir := filepath.Join(t.Name(), strconv.Itoa(n))
|
||||
s, err := New(
|
||||
return New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithRootPath(dir),
|
||||
WithMaxObjectSize(4096),
|
||||
WithSlabCapacity(1024*1024),
|
||||
WithLoopParams(32, loop.Params{}))
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
WithLoopParams(32, loop.Params{RingCount: 1}))
|
||||
}
|
||||
|
||||
blobstortest.TestControl(t, newUringStor, 1024, 2048)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package uringstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -10,7 +11,7 @@ import (
|
|||
|
||||
var errInvalidStorageID = errors.New("invalid storage ID")
|
||||
|
||||
func (s *Storage) Get(prm common.GetPrm) (common.GetRes, error) {
|
||||
func (s *Storage) Get(_ context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
if prm.StorageID == nil {
|
||||
return common.GetRes{}, errNotImplemented
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
package uringstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
func (s *Storage) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
res, err := s.Get(common.GetPrm{
|
||||
func (s *Storage) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
res, err := s.Get(ctx, common.GetPrm{
|
||||
Address: prm.Address,
|
||||
StorageID: prm.StorageID,
|
||||
})
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package slab
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"codeberg.org/fyrchik/uring"
|
||||
|
@ -66,7 +67,10 @@ func (f *File) Fallocate(size int64) error {
|
|||
|
||||
func (f *File) WriteAt(buf []byte, offset int64) error {
|
||||
_, err := f.file.WriteAt(buf, offset)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("os: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *File) ReadAt(buf []byte, offset int64) error {
|
||||
|
@ -89,7 +93,7 @@ func (f *UringFile) Open(flags int) error {
|
|||
}
|
||||
|
||||
fd := f.File.file.Fd()
|
||||
//if err := f.loop.RegisterFiles([]int32{int32(fd)}); err != nil {
|
||||
//if err := f.loop.RegisterFiles([]int32{int32(fd)}); err != nil {
|
||||
// return err
|
||||
//}
|
||||
f.fd = fd
|
||||
|
@ -108,7 +112,7 @@ func (f *UringFile) Close() error {
|
|||
func (f *UringFile) Fallocate(size int64) error {
|
||||
var sqe uring.SQEntry
|
||||
uring.Fallocate(&sqe, f.fd, _FALLOC_FL_ZERO_RANGE, 0, uint64(size))
|
||||
sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
//sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
|
||||
_, err := f.loop.Complete(sqe)
|
||||
return err
|
||||
|
@ -117,16 +121,19 @@ func (f *UringFile) Fallocate(size int64) error {
|
|||
func (f *UringFile) WriteAt(buf []byte, offset int64) error {
|
||||
var sqe uring.SQEntry
|
||||
uring.WriteAt(&sqe, f.fd, buf, uint64(offset))
|
||||
sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
//sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
|
||||
_, err := f.loop.Complete(sqe)
|
||||
if err != nil {
|
||||
return fmt.Errorf("uring: %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *UringFile) ReadAt(buf []byte, offset int64) error {
|
||||
var sqe uring.SQEntry
|
||||
uring.ReadAt(&sqe, f.fd, buf, uint64(offset))
|
||||
sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
//sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
|
||||
_, err := f.loop.Complete(sqe)
|
||||
return err
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"math/bits"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
|
@ -142,7 +141,6 @@ func (s *Slab) readHeader() error {
|
|||
bitmapSize := size / uint64(chunkSize)
|
||||
buf = make([]byte, bitmapSize/8)
|
||||
if err := s.backend.ReadAt(buf, headerSize); err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ func TestSlab(t *testing.T) {
|
|||
testSlab(t, b)
|
||||
})
|
||||
t.Run("uring", func(t *testing.T) {
|
||||
l, err := loop.New(10, loop.Params{})
|
||||
l, err := loop.New(10, &loop.Params{RingCount: 1})
|
||||
require.NoError(t, err)
|
||||
defer l.Close()
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ const (
|
|||
defaultUringLoopSize = 1024
|
||||
)
|
||||
|
||||
func New(opts ...Option) (*Storage, error) {
|
||||
func New(opts ...Option) *Storage {
|
||||
var s Storage
|
||||
s.maxObjectSize = defaultMaxObjectSize
|
||||
s.minObjectSize = defaultMinObjectSize
|
||||
|
@ -60,7 +60,7 @@ func New(opts ...Option) (*Storage, error) {
|
|||
count := bits.TrailingZeros64(s.maxObjectSize) - bits.TrailingZeros64(s.minObjectSize) + 1
|
||||
s.slabMap = make(map[uint16]*slab.Slab)
|
||||
s.slabIDs = make([][]uint16, count)
|
||||
return &s, nil
|
||||
return &s
|
||||
}
|
||||
|
||||
// Type is uring storage type used in logs and configuration.
|
||||
|
|
Loading…
Reference in a new issue