Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
58524d2e76
WIP 2025-03-12 12:58:32 +03:00
4075072954
bench seq 2025-03-12 12:58:31 +03:00
8 changed files with 776 additions and 0 deletions

View file

@ -2,7 +2,10 @@ package benchmark
import (
"context"
"encoding/binary"
"fmt"
"os"
"path/filepath"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -14,6 +17,78 @@ import (
"github.com/stretchr/testify/require"
)
func BenchmarkWriteAllocatedSeq(b *testing.B) {
const objectCount = 100
sizes := []uint64{64 << 10}
for _, size := range sizes {
objGen := testutil.RandObjGenerator{ObjSize: size}
b.Run(fmt.Sprintf("ftruncate %d", size), func(b *testing.B) {
b.ResetTimer()
for run := 0; run < b.N; run++ {
b.StopTimer()
var dataOffset uint32
dir := b.TempDir()
filePrefix := fmt.Sprintf("writecache_%d_%d", run, size)
data, err := os.OpenFile(
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset)),
os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_SYNC,
0o700,
)
require.NoError(b, err, "open writecache file")
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
require.NoError(b, data.Truncate(int64((len(rawData)+4)*objectCount)))
b.StartTimer()
for range objectCount {
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
objectStruct := make([]byte, 4+len(rawData))
binary.LittleEndian.PutUint32(objectStruct, uint32(len(rawData)))
copy(objectStruct[4:], rawData)
c, err := data.WriteAt(objectStruct, int64(dataOffset))
require.NoError(b, err)
require.True(b, c == len(objectStruct))
require.NoError(b, os.Rename(
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset)),
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset+uint32(len(objectStruct)))),
))
dataOffset += uint32(len(objectStruct))
}
require.NoError(b, data.Close())
}
})
b.Run(fmt.Sprintf("fstree %d", size), func(b *testing.B) {
cache := newCache(b)
benchmarkPutPrepare(b, cache)
defer func() { require.NoError(b, cache.Close(context.Background())) }()
b.ResetTimer()
for range b.N {
for range objectCount {
obj := objGen.Next()
rawData, err := obj.Marshal()
require.NoError(b, err, "marshaling object")
prm := common.PutPrm{
Address: testutil.AddressFromObject(b, obj),
Object: obj,
RawData: rawData,
}
if _, err := cache.Put(context.Background(), prm); err != nil {
b.Fatalf("putting: %v", err)
}
}
}
})
}
}
func BenchmarkWritecacheSeq(b *testing.B) {
const payloadSize = 8 << 10
b.Run("bbolt_seq", func(b *testing.B) {

View file

@ -0,0 +1,35 @@
package slabstore
var defaultConfig config = config{
sealSize: 4 << 20,
truncateSize: 5 << 20,
maxActiveCount: 10,
}
type config struct {
path string
readOnly bool
sealSize uint64
truncateSize uint64
maxActiveCount uint32
}
type Option func(c *config)
func WithSealSize(size uint64) Option {
return func(c *config) {
c.sealSize = size
}
}
func WithTruncateSize(size uint64) Option {
return func(c *config) {
c.truncateSize = size
}
}
func WithMaxActiveSlabsCount(count uint32) Option {
return func(c *config) {
c.maxActiveCount = count
}
}

View file

@ -0,0 +1,153 @@
package slabstore
import (
"slices"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"github.com/google/uuid"
)
func (s *Store) dispatch() {
s.wg.Done()
for {
slab := s.nextActive()
var writeReady chan *Slab
if slab != nil {
writeReady = s.writeReady
}
select {
case <-s.closed:
return
case slab := <-s.writeCompleted:
assert.True(slab.writers == 1, "slab writers count must be equal 1")
slab.writers--
if s.slabSealed(slab) {
s.sealed = append(s.sealed, slab)
} else {
s.active = append(s.active, slab)
}
if stop := s.dropIfNeeded(slab); stop {
return
}
case req := <-s.readReq:
var resp readResponse
if slab, ok := s.all[req.id]; ok {
slab.readers++
resp.slab = slab
} else {
resp.err = ErrSlabNotFound
}
select {
case req.result <- &resp:
case <-s.closed:
return
}
case slab := <-s.readCompleted:
assert.True(slab.readers > 0, "slab readers count must be greater than zero")
slab.readers--
if stop := s.dropIfNeeded(slab); stop {
return
}
case req := <-s.sealedReq:
s.active = slices.DeleteFunc(s.active, func(slab *Slab) bool {
if s.slabSealed(slab) {
s.sealed = append(s.sealed, slab)
return true
}
return false
})
select {
case req.result <- &sealedResponse{
slabs: s.sealed,
}:
case <-s.closed:
return
}
case req := <-s.dropReq:
slab, ok := s.all[req.id]
if !ok {
select {
case req.result <- &dropResponse{err: ErrSlabNotFound}:
continue
case <-s.closed:
return
}
}
s.active = slices.DeleteFunc(s.active, func(slab *Slab) bool {
return slab.ID() == req.id
})
s.sealed = slices.DeleteFunc(s.sealed, func(slab *Slab) bool {
return slab.ID() == req.id
})
if slab.readers == 0 && slab.writers == 0 {
delete(s.all, req.id)
select {
case req.result <- &dropResponse{slab: slab}:
continue
case <-s.closed:
return
}
}
s.toDrop[req.id] = append(s.toDrop[req.id], req.result)
case writeReady <- slab:
slab.writers++
}
}
}
func (s *Store) dropIfNeeded(slab *Slab) bool {
if slab.readers > 0 || slab.writers > 0 {
return false
}
waiters, ok := s.toDrop[slab.ID()]
if !ok {
return false
}
for _, w := range waiters {
select {
case w <- &dropResponse{slab: slab}:
case <-s.closed:
return true
}
}
delete(s.toDrop, slab.ID())
s.active = slices.DeleteFunc(s.active, func(sl *Slab) bool {
return slab.ID() == sl.ID()
})
s.sealed = slices.DeleteFunc(s.sealed, func(sl *Slab) bool {
return slab.ID() == sl.ID()
})
delete(s.all, slab.ID())
return false
}
func (s *Store) nextActive() *Slab {
if len(s.active) > 0 {
result := s.active[len(s.active)-1]
s.active = s.active[:len(s.active)-1]
return result
}
if uint32(len(s.all))-uint32(len(s.sealed)) < s.config.maxActiveCount {
return s.createNewSlab()
}
return nil
}
func (s *Store) createNewSlab() *Slab {
slab := &Slab{}
id, err := uuid.NewRandom()
if err != nil {
// TODO: log
return nil
}
slab.id = SlabID(id)
slab.dir = s.config.path
slab.truncateSize = s.config.truncateSize
s.all[slab.id] = slab
return slab
}
func (s *Store) slabSealed(slab *Slab) bool {
return slab.commited() >= s.config.sealSize
}

View file

@ -0,0 +1,15 @@
package slabstore
import "errors"
var (
ErrClosed = errors.New("slabstore is closed")
ErrNotInitialized = errors.New("slabstore is not initialized")
ErrReadOnly = errors.New("slabstore is in read-only mode")
ErrSlabNotFound = errors.New("slab not found")
ErrInvalidOffsetLength = errors.New("invalid offset/length for slab")
ErrIncompleteWrite = errors.New("incomplete write to slab")
ErrIncompleteRead = errors.New("incomplete read from slab")
ErrPurged = errors.New("slab already purged")
ErrInvalidSlabName = errors.New("invalid slab name")
)

View file

@ -0,0 +1,180 @@
package slabstore
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/google/uuid"
)
type Slab struct {
mtx sync.RWMutex
dir string
id SlabID
truncateSize uint64
readers int64
writers int64
commitedBytes uint64
truncated bool
purged bool
}
func (s *Slab) ID() SlabID {
return s.id
}
func (s *Slab) Write(data []byte) (OffsetLength, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.purged {
return OffsetLength{}, ErrPurged
}
file, err := s.openFileWrite()
if err != nil {
return OffsetLength{}, err
}
n, err := file.WriteAt(data, int64(s.commitedBytes))
if err != nil {
closeErr := file.Close()
return OffsetLength{}, fmt.Errorf("write data: %w", errors.Join(err, closeErr))
}
if n != len(data) {
closeErr := file.Close()
return OffsetLength{}, fmt.Errorf("write data: %w", errors.Join(ErrIncompleteWrite, closeErr))
}
if err := file.Close(); err != nil {
return OffsetLength{}, fmt.Errorf("close file: %w", err)
}
nextCommitedBytes := s.commitedBytes + uint64(len(data))
if err := os.Rename(s.currentFilename(), s.filename(nextCommitedBytes)); err != nil {
return OffsetLength{}, fmt.Errorf("rename slab: %w", err)
}
result := OffsetLength{
Offset: s.commitedBytes,
Length: uint64(len(data)),
}
s.commitedBytes = nextCommitedBytes
return result, nil
}
func (s *Slab) commited() uint64 {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.commitedBytes
}
func (s *Slab) currentFilename() string {
return s.filename(s.commitedBytes)
}
func (s *Slab) filename(commitedBytes uint64) string {
return filepath.Join(s.dir, fmt.Sprintf("%s_%d.bin", s.ID().String(), commitedBytes))
}
func (s *Slab) openFileWrite() (*os.File, error) {
flags := os.O_CREATE | os.O_SYNC | os.O_WRONLY
f, err := os.OpenFile(
s.currentFilename(),
flags,
0o700,
)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
if !s.truncated {
if err := f.Truncate(int64(s.truncateSize)); err != nil {
closeErr := f.Close()
return nil, fmt.Errorf("truncate file: %w", errors.Join(err, closeErr))
}
s.truncated = true
}
return f, nil
}
func (s *Slab) Read(from OffsetLength) ([]byte, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
if s.purged {
return nil, ErrPurged
}
if from.Offset+from.Length > s.commitedBytes {
return nil, ErrInvalidOffsetLength
}
file, err := s.openFileRead()
if err != nil {
return nil, err
}
buf := make([]byte, from.Length)
n, err := file.ReadAt(buf, int64(from.Offset))
if err != nil {
closeErr := file.Close()
return nil, fmt.Errorf("read data: %w", errors.Join(err, closeErr))
}
if n != len(buf) {
closeErr := file.Close()
return nil, fmt.Errorf("read data: %w", errors.Join(ErrIncompleteRead, closeErr))
}
if err := file.Close(); err != nil {
return nil, fmt.Errorf("close file: %w", err)
}
return buf, nil
}
func (s *Slab) openFileRead() (*os.File, error) {
f, err := os.Open(s.currentFilename())
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
return f, nil
}
func (s *Slab) PurgeData() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.purged {
return ErrPurged
}
if err := os.Remove(s.currentFilename()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("purge data: %w", err)
}
s.commitedBytes = 0
s.purged = true
return nil
}
func parseSlabName(filename string) (SlabID, uint64, error) {
splitted := strings.Split(filename, "_")
if len(splitted) != 2 {
return SlabID{}, 0, ErrInvalidSlabName
}
id, err := uuid.Parse(splitted[0])
if err != nil {
return SlabID{}, 0, fmt.Errorf("invalid slab name: parse ID: %w", err)
}
commited, err := strconv.ParseUint(strings.TrimRight(splitted[1], ".bin"), 10, 64)
if err != nil {
return SlabID{}, 0, fmt.Errorf("invalid slab name: parse commited: %w", err)
}
return SlabID(id), commited, nil
}

View file

@ -0,0 +1,3 @@
package slabstore
type stat struct{}

View file

@ -0,0 +1,248 @@
package slabstore
import (
"context"
"fmt"
"os"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
)
type Store struct {
config config
wg sync.WaitGroup
initialized atomic.Bool
writeReady chan *Slab
writeCompleted chan *Slab
readReq chan *readRequest
readCompleted chan *Slab
sealedReq chan *sealedRequest
dropReq chan *dropRequest
closed chan struct{}
active []*Slab
sealed []*Slab
all map[SlabID]*Slab
toDrop map[SlabID][]chan *dropResponse
}
func NewStore(path string, readOnly bool, opts ...Option) *Store {
config := defaultConfig
config.path = path
config.readOnly = readOnly
for _, o := range opts {
o(&config)
}
return &Store{
config: config,
writeReady: make(chan *Slab),
writeCompleted: make(chan *Slab),
readReq: make(chan *readRequest),
readCompleted: make(chan *Slab),
sealedReq: make(chan *sealedRequest),
dropReq: make(chan *dropRequest),
closed: make(chan struct{}),
active: make([]*Slab, 0, config.maxActiveCount),
sealed: make([]*Slab, 0),
all: make(map[SlabID]*Slab),
toDrop: make(map[SlabID][]chan *dropResponse),
}
}
func (s *Store) Init(ctx context.Context, reader SlabReader) error {
if s.initialized.Load() {
return nil
}
err := util.MkdirAllX(s.config.path, os.ModePerm)
if err != nil {
return err
}
entries, err := os.ReadDir(s.config.path)
if err != nil {
return fmt.Errorf("read existing slabs: %w", err)
}
for _, ent := range entries {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if ent.IsDir() {
continue
}
id, commited, err := parseSlabName(ent.Name())
if err != nil {
return err
}
slab := &Slab{
dir: s.config.path,
id: id,
truncateSize: s.config.truncateSize,
commitedBytes: commited,
}
if reader != nil {
if err := reader.Read(ctx, slab); err != nil {
return fmt.Errorf("read slab: %w", err)
}
}
s.all[slab.ID()] = slab
if s.slabSealed(slab) {
s.sealed = append(s.sealed, slab)
} else {
s.active = append(s.active, slab)
}
}
s.wg.Add(1)
go s.dispatch()
s.initialized.Store(true)
return nil
}
func (s *Store) Write(ctx context.Context) (WriteSlab, ReleaseFunc, error) {
if !s.initialized.Load() {
return nil, nil, ErrNotInitialized
}
if s.config.readOnly {
return nil, nil, ErrReadOnly
}
select {
case slab := <-s.writeReady:
return slab, func() {
select {
case s.writeCompleted <- slab:
case <-s.closed:
}
}, nil
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-s.closed:
return nil, nil, ErrClosed
}
}
func (s *Store) Read(ctx context.Context, id SlabID) (ReadSlab, ReleaseFunc, error) {
if !s.initialized.Load() {
return nil, nil, ErrNotInitialized
}
req := &readRequest{
id: id,
result: make(chan *readResponse),
}
select {
case s.readReq <- req:
select {
case resp := <-req.result:
if resp.err != nil {
return nil, nil, resp.err
}
return resp.slab, func() {
select {
case s.readCompleted <- resp.slab:
case <-s.closed:
}
}, nil
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-s.closed:
return nil, nil, ErrClosed
}
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-s.closed:
return nil, nil, ErrClosed
}
}
func (s *Store) Sealed(ctx context.Context) ([]ReadSlab, error) {
if !s.initialized.Load() {
return nil, ErrNotInitialized
}
req := &sealedRequest{
result: make(chan *sealedResponse),
}
select {
case s.sealedReq <- req:
select {
case resp := <-req.result:
if resp.err != nil {
return nil, resp.err
}
result := make([]ReadSlab, 0, len(resp.slabs))
for _, slab := range resp.slabs {
result = append(result, slab)
}
return result, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-s.closed:
return nil, ErrClosed
}
case <-ctx.Done():
return nil, ctx.Err()
case <-s.closed:
return nil, ErrClosed
}
}
func (s *Store) Purge(ctx context.Context, id SlabID) error {
slab, err := s.drop(ctx, id)
if err != nil {
return err
}
return slab.PurgeData()
}
func (s *Store) drop(ctx context.Context, id SlabID) (*Slab, error) {
if !s.initialized.Load() {
return nil, ErrNotInitialized
}
req := &dropRequest{
id: id,
result: make(chan *dropResponse),
}
select {
case s.dropReq <- req:
select {
case resp := <-req.result:
return resp.slab, resp.err
case <-ctx.Done():
return nil, ctx.Err()
case <-s.closed:
return nil, ErrClosed
}
case <-ctx.Done():
return nil, ctx.Err()
case <-s.closed:
return nil, ErrClosed
}
}
func (s *Store) Close() {
select {
case <-s.closed:
return
default:
close(s.closed)
s.wg.Wait()
s.initialized.Store(false)
}
}

View file

@ -0,0 +1,67 @@
package slabstore
import (
"context"
"github.com/google/uuid"
)
type SlabID uuid.UUID
func (id SlabID) String() string {
return uuid.UUID(id).String()
}
type OffsetLength struct {
Offset uint64
Length uint64
}
type ReleaseFunc func()
type SlabWithID interface {
ID() SlabID
}
type ReadSlab interface {
SlabWithID
Read(from OffsetLength) ([]byte, error)
}
type WriteSlab interface {
SlabWithID
Write(data []byte) (OffsetLength, error)
}
type readRequest struct {
id SlabID
result chan *readResponse
}
type readResponse struct {
slab *Slab
err error
}
type sealedRequest struct {
result chan *sealedResponse
}
type sealedResponse struct {
slabs []*Slab
err error
}
type dropRequest struct {
id SlabID
result chan *dropResponse
}
type dropResponse struct {
slab *Slab
err error
}
type SlabReader interface {
Read(ctx context.Context, slab *Slab) error
}