Compare commits
2 commits
master
...
feat/write
Author | SHA1 | Date | |
---|---|---|---|
58524d2e76 | |||
4075072954 |
8 changed files with 776 additions and 0 deletions
|
@ -2,7 +2,10 @@ package benchmark
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -14,6 +17,78 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func BenchmarkWriteAllocatedSeq(b *testing.B) {
|
||||||
|
const objectCount = 100
|
||||||
|
sizes := []uint64{64 << 10}
|
||||||
|
for _, size := range sizes {
|
||||||
|
objGen := testutil.RandObjGenerator{ObjSize: size}
|
||||||
|
b.Run(fmt.Sprintf("ftruncate %d", size), func(b *testing.B) {
|
||||||
|
b.ResetTimer()
|
||||||
|
for run := 0; run < b.N; run++ {
|
||||||
|
b.StopTimer()
|
||||||
|
var dataOffset uint32
|
||||||
|
dir := b.TempDir()
|
||||||
|
filePrefix := fmt.Sprintf("writecache_%d_%d", run, size)
|
||||||
|
data, err := os.OpenFile(
|
||||||
|
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset)),
|
||||||
|
os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_SYNC,
|
||||||
|
0o700,
|
||||||
|
)
|
||||||
|
require.NoError(b, err, "open writecache file")
|
||||||
|
|
||||||
|
obj := objGen.Next()
|
||||||
|
rawData, err := obj.Marshal()
|
||||||
|
require.NoError(b, err, "marshaling object")
|
||||||
|
require.NoError(b, data.Truncate(int64((len(rawData)+4)*objectCount)))
|
||||||
|
b.StartTimer()
|
||||||
|
|
||||||
|
for range objectCount {
|
||||||
|
obj := objGen.Next()
|
||||||
|
rawData, err := obj.Marshal()
|
||||||
|
require.NoError(b, err, "marshaling object")
|
||||||
|
|
||||||
|
objectStruct := make([]byte, 4+len(rawData))
|
||||||
|
binary.LittleEndian.PutUint32(objectStruct, uint32(len(rawData)))
|
||||||
|
copy(objectStruct[4:], rawData)
|
||||||
|
c, err := data.WriteAt(objectStruct, int64(dataOffset))
|
||||||
|
require.NoError(b, err)
|
||||||
|
require.True(b, c == len(objectStruct))
|
||||||
|
require.NoError(b, os.Rename(
|
||||||
|
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset)),
|
||||||
|
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset+uint32(len(objectStruct)))),
|
||||||
|
))
|
||||||
|
dataOffset += uint32(len(objectStruct))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(b, data.Close())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run(fmt.Sprintf("fstree %d", size), func(b *testing.B) {
|
||||||
|
cache := newCache(b)
|
||||||
|
benchmarkPutPrepare(b, cache)
|
||||||
|
defer func() { require.NoError(b, cache.Close(context.Background())) }()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for range b.N {
|
||||||
|
for range objectCount {
|
||||||
|
obj := objGen.Next()
|
||||||
|
rawData, err := obj.Marshal()
|
||||||
|
require.NoError(b, err, "marshaling object")
|
||||||
|
prm := common.PutPrm{
|
||||||
|
Address: testutil.AddressFromObject(b, obj),
|
||||||
|
Object: obj,
|
||||||
|
RawData: rawData,
|
||||||
|
}
|
||||||
|
if _, err := cache.Put(context.Background(), prm); err != nil {
|
||||||
|
b.Fatalf("putting: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkWritecacheSeq(b *testing.B) {
|
func BenchmarkWritecacheSeq(b *testing.B) {
|
||||||
const payloadSize = 8 << 10
|
const payloadSize = 8 << 10
|
||||||
b.Run("bbolt_seq", func(b *testing.B) {
|
b.Run("bbolt_seq", func(b *testing.B) {
|
||||||
|
|
35
pkg/local_object_storage/writecache/slabstore/config.go
Normal file
35
pkg/local_object_storage/writecache/slabstore/config.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
var defaultConfig config = config{
|
||||||
|
sealSize: 4 << 20,
|
||||||
|
truncateSize: 5 << 20,
|
||||||
|
maxActiveCount: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
type config struct {
|
||||||
|
path string
|
||||||
|
readOnly bool
|
||||||
|
sealSize uint64
|
||||||
|
truncateSize uint64
|
||||||
|
maxActiveCount uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option func(c *config)
|
||||||
|
|
||||||
|
func WithSealSize(size uint64) Option {
|
||||||
|
return func(c *config) {
|
||||||
|
c.sealSize = size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithTruncateSize(size uint64) Option {
|
||||||
|
return func(c *config) {
|
||||||
|
c.truncateSize = size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithMaxActiveSlabsCount(count uint32) Option {
|
||||||
|
return func(c *config) {
|
||||||
|
c.maxActiveCount = count
|
||||||
|
}
|
||||||
|
}
|
153
pkg/local_object_storage/writecache/slabstore/dispatch.go
Normal file
153
pkg/local_object_storage/writecache/slabstore/dispatch.go
Normal file
|
@ -0,0 +1,153 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"slices"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Store) dispatch() {
|
||||||
|
s.wg.Done()
|
||||||
|
for {
|
||||||
|
slab := s.nextActive()
|
||||||
|
var writeReady chan *Slab
|
||||||
|
if slab != nil {
|
||||||
|
writeReady = s.writeReady
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
return
|
||||||
|
case slab := <-s.writeCompleted:
|
||||||
|
assert.True(slab.writers == 1, "slab writers count must be equal 1")
|
||||||
|
slab.writers--
|
||||||
|
if s.slabSealed(slab) {
|
||||||
|
s.sealed = append(s.sealed, slab)
|
||||||
|
} else {
|
||||||
|
s.active = append(s.active, slab)
|
||||||
|
}
|
||||||
|
if stop := s.dropIfNeeded(slab); stop {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case req := <-s.readReq:
|
||||||
|
var resp readResponse
|
||||||
|
if slab, ok := s.all[req.id]; ok {
|
||||||
|
slab.readers++
|
||||||
|
resp.slab = slab
|
||||||
|
} else {
|
||||||
|
resp.err = ErrSlabNotFound
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case req.result <- &resp:
|
||||||
|
case <-s.closed:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case slab := <-s.readCompleted:
|
||||||
|
assert.True(slab.readers > 0, "slab readers count must be greater than zero")
|
||||||
|
slab.readers--
|
||||||
|
if stop := s.dropIfNeeded(slab); stop {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case req := <-s.sealedReq:
|
||||||
|
s.active = slices.DeleteFunc(s.active, func(slab *Slab) bool {
|
||||||
|
if s.slabSealed(slab) {
|
||||||
|
s.sealed = append(s.sealed, slab)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
select {
|
||||||
|
case req.result <- &sealedResponse{
|
||||||
|
slabs: s.sealed,
|
||||||
|
}:
|
||||||
|
case <-s.closed:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case req := <-s.dropReq:
|
||||||
|
slab, ok := s.all[req.id]
|
||||||
|
if !ok {
|
||||||
|
select {
|
||||||
|
case req.result <- &dropResponse{err: ErrSlabNotFound}:
|
||||||
|
continue
|
||||||
|
case <-s.closed:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.active = slices.DeleteFunc(s.active, func(slab *Slab) bool {
|
||||||
|
return slab.ID() == req.id
|
||||||
|
})
|
||||||
|
s.sealed = slices.DeleteFunc(s.sealed, func(slab *Slab) bool {
|
||||||
|
return slab.ID() == req.id
|
||||||
|
})
|
||||||
|
if slab.readers == 0 && slab.writers == 0 {
|
||||||
|
delete(s.all, req.id)
|
||||||
|
select {
|
||||||
|
case req.result <- &dropResponse{slab: slab}:
|
||||||
|
continue
|
||||||
|
case <-s.closed:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.toDrop[req.id] = append(s.toDrop[req.id], req.result)
|
||||||
|
case writeReady <- slab:
|
||||||
|
slab.writers++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) dropIfNeeded(slab *Slab) bool {
|
||||||
|
if slab.readers > 0 || slab.writers > 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
waiters, ok := s.toDrop[slab.ID()]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, w := range waiters {
|
||||||
|
select {
|
||||||
|
case w <- &dropResponse{slab: slab}:
|
||||||
|
case <-s.closed:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(s.toDrop, slab.ID())
|
||||||
|
s.active = slices.DeleteFunc(s.active, func(sl *Slab) bool {
|
||||||
|
return slab.ID() == sl.ID()
|
||||||
|
})
|
||||||
|
s.sealed = slices.DeleteFunc(s.sealed, func(sl *Slab) bool {
|
||||||
|
return slab.ID() == sl.ID()
|
||||||
|
})
|
||||||
|
delete(s.all, slab.ID())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) nextActive() *Slab {
|
||||||
|
if len(s.active) > 0 {
|
||||||
|
result := s.active[len(s.active)-1]
|
||||||
|
s.active = s.active[:len(s.active)-1]
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
if uint32(len(s.all))-uint32(len(s.sealed)) < s.config.maxActiveCount {
|
||||||
|
return s.createNewSlab()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) createNewSlab() *Slab {
|
||||||
|
slab := &Slab{}
|
||||||
|
id, err := uuid.NewRandom()
|
||||||
|
if err != nil {
|
||||||
|
// TODO: log
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
slab.id = SlabID(id)
|
||||||
|
slab.dir = s.config.path
|
||||||
|
slab.truncateSize = s.config.truncateSize
|
||||||
|
s.all[slab.id] = slab
|
||||||
|
return slab
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) slabSealed(slab *Slab) bool {
|
||||||
|
return slab.commited() >= s.config.sealSize
|
||||||
|
}
|
15
pkg/local_object_storage/writecache/slabstore/errors.go
Normal file
15
pkg/local_object_storage/writecache/slabstore/errors.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrClosed = errors.New("slabstore is closed")
|
||||||
|
ErrNotInitialized = errors.New("slabstore is not initialized")
|
||||||
|
ErrReadOnly = errors.New("slabstore is in read-only mode")
|
||||||
|
ErrSlabNotFound = errors.New("slab not found")
|
||||||
|
ErrInvalidOffsetLength = errors.New("invalid offset/length for slab")
|
||||||
|
ErrIncompleteWrite = errors.New("incomplete write to slab")
|
||||||
|
ErrIncompleteRead = errors.New("incomplete read from slab")
|
||||||
|
ErrPurged = errors.New("slab already purged")
|
||||||
|
ErrInvalidSlabName = errors.New("invalid slab name")
|
||||||
|
)
|
180
pkg/local_object_storage/writecache/slabstore/slab.go
Normal file
180
pkg/local_object_storage/writecache/slabstore/slab.go
Normal file
|
@ -0,0 +1,180 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Slab struct {
|
||||||
|
mtx sync.RWMutex
|
||||||
|
dir string
|
||||||
|
id SlabID
|
||||||
|
truncateSize uint64
|
||||||
|
|
||||||
|
readers int64
|
||||||
|
writers int64
|
||||||
|
|
||||||
|
commitedBytes uint64
|
||||||
|
truncated bool
|
||||||
|
purged bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) ID() SlabID {
|
||||||
|
return s.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) Write(data []byte) (OffsetLength, error) {
|
||||||
|
s.mtx.Lock()
|
||||||
|
defer s.mtx.Unlock()
|
||||||
|
|
||||||
|
if s.purged {
|
||||||
|
return OffsetLength{}, ErrPurged
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := s.openFileWrite()
|
||||||
|
if err != nil {
|
||||||
|
return OffsetLength{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := file.WriteAt(data, int64(s.commitedBytes))
|
||||||
|
if err != nil {
|
||||||
|
closeErr := file.Close()
|
||||||
|
return OffsetLength{}, fmt.Errorf("write data: %w", errors.Join(err, closeErr))
|
||||||
|
}
|
||||||
|
if n != len(data) {
|
||||||
|
closeErr := file.Close()
|
||||||
|
return OffsetLength{}, fmt.Errorf("write data: %w", errors.Join(ErrIncompleteWrite, closeErr))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return OffsetLength{}, fmt.Errorf("close file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nextCommitedBytes := s.commitedBytes + uint64(len(data))
|
||||||
|
|
||||||
|
if err := os.Rename(s.currentFilename(), s.filename(nextCommitedBytes)); err != nil {
|
||||||
|
return OffsetLength{}, fmt.Errorf("rename slab: %w", err)
|
||||||
|
}
|
||||||
|
result := OffsetLength{
|
||||||
|
Offset: s.commitedBytes,
|
||||||
|
Length: uint64(len(data)),
|
||||||
|
}
|
||||||
|
s.commitedBytes = nextCommitedBytes
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) commited() uint64 {
|
||||||
|
s.mtx.RLock()
|
||||||
|
defer s.mtx.RUnlock()
|
||||||
|
|
||||||
|
return s.commitedBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) currentFilename() string {
|
||||||
|
return s.filename(s.commitedBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) filename(commitedBytes uint64) string {
|
||||||
|
return filepath.Join(s.dir, fmt.Sprintf("%s_%d.bin", s.ID().String(), commitedBytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) openFileWrite() (*os.File, error) {
|
||||||
|
flags := os.O_CREATE | os.O_SYNC | os.O_WRONLY
|
||||||
|
f, err := os.OpenFile(
|
||||||
|
s.currentFilename(),
|
||||||
|
flags,
|
||||||
|
0o700,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("open file: %w", err)
|
||||||
|
}
|
||||||
|
if !s.truncated {
|
||||||
|
if err := f.Truncate(int64(s.truncateSize)); err != nil {
|
||||||
|
closeErr := f.Close()
|
||||||
|
return nil, fmt.Errorf("truncate file: %w", errors.Join(err, closeErr))
|
||||||
|
}
|
||||||
|
s.truncated = true
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) Read(from OffsetLength) ([]byte, error) {
|
||||||
|
s.mtx.RLock()
|
||||||
|
defer s.mtx.RUnlock()
|
||||||
|
|
||||||
|
if s.purged {
|
||||||
|
return nil, ErrPurged
|
||||||
|
}
|
||||||
|
|
||||||
|
if from.Offset+from.Length > s.commitedBytes {
|
||||||
|
return nil, ErrInvalidOffsetLength
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := s.openFileRead()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, from.Length)
|
||||||
|
n, err := file.ReadAt(buf, int64(from.Offset))
|
||||||
|
if err != nil {
|
||||||
|
closeErr := file.Close()
|
||||||
|
return nil, fmt.Errorf("read data: %w", errors.Join(err, closeErr))
|
||||||
|
}
|
||||||
|
if n != len(buf) {
|
||||||
|
closeErr := file.Close()
|
||||||
|
return nil, fmt.Errorf("read data: %w", errors.Join(ErrIncompleteRead, closeErr))
|
||||||
|
}
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return nil, fmt.Errorf("close file: %w", err)
|
||||||
|
}
|
||||||
|
return buf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) openFileRead() (*os.File, error) {
|
||||||
|
f, err := os.Open(s.currentFilename())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("open file: %w", err)
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Slab) PurgeData() error {
|
||||||
|
s.mtx.Lock()
|
||||||
|
defer s.mtx.Unlock()
|
||||||
|
|
||||||
|
if s.purged {
|
||||||
|
return ErrPurged
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Remove(s.currentFilename()); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("purge data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.commitedBytes = 0
|
||||||
|
s.purged = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSlabName(filename string) (SlabID, uint64, error) {
|
||||||
|
splitted := strings.Split(filename, "_")
|
||||||
|
if len(splitted) != 2 {
|
||||||
|
return SlabID{}, 0, ErrInvalidSlabName
|
||||||
|
}
|
||||||
|
id, err := uuid.Parse(splitted[0])
|
||||||
|
if err != nil {
|
||||||
|
return SlabID{}, 0, fmt.Errorf("invalid slab name: parse ID: %w", err)
|
||||||
|
}
|
||||||
|
commited, err := strconv.ParseUint(strings.TrimRight(splitted[1], ".bin"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return SlabID{}, 0, fmt.Errorf("invalid slab name: parse commited: %w", err)
|
||||||
|
}
|
||||||
|
return SlabID(id), commited, nil
|
||||||
|
}
|
3
pkg/local_object_storage/writecache/slabstore/stat.go
Normal file
3
pkg/local_object_storage/writecache/slabstore/stat.go
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
type stat struct{}
|
248
pkg/local_object_storage/writecache/slabstore/store.go
Normal file
248
pkg/local_object_storage/writecache/slabstore/store.go
Normal file
|
@ -0,0 +1,248 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Store struct {
|
||||||
|
config config
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
initialized atomic.Bool
|
||||||
|
|
||||||
|
writeReady chan *Slab
|
||||||
|
writeCompleted chan *Slab
|
||||||
|
readReq chan *readRequest
|
||||||
|
readCompleted chan *Slab
|
||||||
|
sealedReq chan *sealedRequest
|
||||||
|
dropReq chan *dropRequest
|
||||||
|
closed chan struct{}
|
||||||
|
|
||||||
|
active []*Slab
|
||||||
|
sealed []*Slab
|
||||||
|
all map[SlabID]*Slab
|
||||||
|
toDrop map[SlabID][]chan *dropResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(path string, readOnly bool, opts ...Option) *Store {
|
||||||
|
config := defaultConfig
|
||||||
|
config.path = path
|
||||||
|
config.readOnly = readOnly
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&config)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Store{
|
||||||
|
config: config,
|
||||||
|
writeReady: make(chan *Slab),
|
||||||
|
writeCompleted: make(chan *Slab),
|
||||||
|
readReq: make(chan *readRequest),
|
||||||
|
readCompleted: make(chan *Slab),
|
||||||
|
sealedReq: make(chan *sealedRequest),
|
||||||
|
dropReq: make(chan *dropRequest),
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
active: make([]*Slab, 0, config.maxActiveCount),
|
||||||
|
sealed: make([]*Slab, 0),
|
||||||
|
all: make(map[SlabID]*Slab),
|
||||||
|
toDrop: make(map[SlabID][]chan *dropResponse),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Init(ctx context.Context, reader SlabReader) error {
|
||||||
|
if s.initialized.Load() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := util.MkdirAllX(s.config.path, os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, err := os.ReadDir(s.config.path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read existing slabs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ent := range entries {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if ent.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
id, commited, err := parseSlabName(ent.Name())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
slab := &Slab{
|
||||||
|
dir: s.config.path,
|
||||||
|
id: id,
|
||||||
|
truncateSize: s.config.truncateSize,
|
||||||
|
commitedBytes: commited,
|
||||||
|
}
|
||||||
|
|
||||||
|
if reader != nil {
|
||||||
|
if err := reader.Read(ctx, slab); err != nil {
|
||||||
|
return fmt.Errorf("read slab: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.all[slab.ID()] = slab
|
||||||
|
|
||||||
|
if s.slabSealed(slab) {
|
||||||
|
s.sealed = append(s.sealed, slab)
|
||||||
|
} else {
|
||||||
|
s.active = append(s.active, slab)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.wg.Add(1)
|
||||||
|
go s.dispatch()
|
||||||
|
s.initialized.Store(true)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Write(ctx context.Context) (WriteSlab, ReleaseFunc, error) {
|
||||||
|
if !s.initialized.Load() {
|
||||||
|
return nil, nil, ErrNotInitialized
|
||||||
|
}
|
||||||
|
if s.config.readOnly {
|
||||||
|
return nil, nil, ErrReadOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case slab := <-s.writeReady:
|
||||||
|
return slab, func() {
|
||||||
|
select {
|
||||||
|
case s.writeCompleted <- slab:
|
||||||
|
case <-s.closed:
|
||||||
|
}
|
||||||
|
}, nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, nil, ErrClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Read(ctx context.Context, id SlabID) (ReadSlab, ReleaseFunc, error) {
|
||||||
|
if !s.initialized.Load() {
|
||||||
|
return nil, nil, ErrNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &readRequest{
|
||||||
|
id: id,
|
||||||
|
result: make(chan *readResponse),
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.readReq <- req:
|
||||||
|
select {
|
||||||
|
case resp := <-req.result:
|
||||||
|
if resp.err != nil {
|
||||||
|
return nil, nil, resp.err
|
||||||
|
}
|
||||||
|
return resp.slab, func() {
|
||||||
|
select {
|
||||||
|
case s.readCompleted <- resp.slab:
|
||||||
|
case <-s.closed:
|
||||||
|
}
|
||||||
|
}, nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, nil, ErrClosed
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, nil, ErrClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Sealed(ctx context.Context) ([]ReadSlab, error) {
|
||||||
|
if !s.initialized.Load() {
|
||||||
|
return nil, ErrNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &sealedRequest{
|
||||||
|
result: make(chan *sealedResponse),
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.sealedReq <- req:
|
||||||
|
select {
|
||||||
|
case resp := <-req.result:
|
||||||
|
if resp.err != nil {
|
||||||
|
return nil, resp.err
|
||||||
|
}
|
||||||
|
result := make([]ReadSlab, 0, len(resp.slabs))
|
||||||
|
for _, slab := range resp.slabs {
|
||||||
|
result = append(result, slab)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, ErrClosed
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, ErrClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Purge(ctx context.Context, id SlabID) error {
|
||||||
|
slab, err := s.drop(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return slab.PurgeData()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) drop(ctx context.Context, id SlabID) (*Slab, error) {
|
||||||
|
if !s.initialized.Load() {
|
||||||
|
return nil, ErrNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &dropRequest{
|
||||||
|
id: id,
|
||||||
|
result: make(chan *dropResponse),
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.dropReq <- req:
|
||||||
|
select {
|
||||||
|
case resp := <-req.result:
|
||||||
|
return resp.slab, resp.err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, ErrClosed
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-s.closed:
|
||||||
|
return nil, ErrClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Close() {
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
close(s.closed)
|
||||||
|
s.wg.Wait()
|
||||||
|
s.initialized.Store(false)
|
||||||
|
}
|
||||||
|
}
|
67
pkg/local_object_storage/writecache/slabstore/types.go
Normal file
67
pkg/local_object_storage/writecache/slabstore/types.go
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
package slabstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SlabID uuid.UUID
|
||||||
|
|
||||||
|
func (id SlabID) String() string {
|
||||||
|
return uuid.UUID(id).String()
|
||||||
|
}
|
||||||
|
|
||||||
|
type OffsetLength struct {
|
||||||
|
Offset uint64
|
||||||
|
Length uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReleaseFunc func()
|
||||||
|
|
||||||
|
type SlabWithID interface {
|
||||||
|
ID() SlabID
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReadSlab interface {
|
||||||
|
SlabWithID
|
||||||
|
Read(from OffsetLength) ([]byte, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteSlab interface {
|
||||||
|
SlabWithID
|
||||||
|
Write(data []byte) (OffsetLength, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type readRequest struct {
|
||||||
|
id SlabID
|
||||||
|
result chan *readResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
type readResponse struct {
|
||||||
|
slab *Slab
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type sealedRequest struct {
|
||||||
|
result chan *sealedResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
type sealedResponse struct {
|
||||||
|
slabs []*Slab
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type dropRequest struct {
|
||||||
|
id SlabID
|
||||||
|
result chan *dropResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
type dropResponse struct {
|
||||||
|
slab *Slab
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type SlabReader interface {
|
||||||
|
Read(ctx context.Context, slab *Slab) error
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue