Compare commits
2 commits
master
...
feat/write
Author | SHA1 | Date | |
---|---|---|---|
58524d2e76 | |||
4075072954 |
8 changed files with 776 additions and 0 deletions
|
@ -2,7 +2,10 @@ package benchmark
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -14,6 +17,78 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkWriteAllocatedSeq(b *testing.B) {
|
||||
const objectCount = 100
|
||||
sizes := []uint64{64 << 10}
|
||||
for _, size := range sizes {
|
||||
objGen := testutil.RandObjGenerator{ObjSize: size}
|
||||
b.Run(fmt.Sprintf("ftruncate %d", size), func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for run := 0; run < b.N; run++ {
|
||||
b.StopTimer()
|
||||
var dataOffset uint32
|
||||
dir := b.TempDir()
|
||||
filePrefix := fmt.Sprintf("writecache_%d_%d", run, size)
|
||||
data, err := os.OpenFile(
|
||||
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset)),
|
||||
os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_SYNC,
|
||||
0o700,
|
||||
)
|
||||
require.NoError(b, err, "open writecache file")
|
||||
|
||||
obj := objGen.Next()
|
||||
rawData, err := obj.Marshal()
|
||||
require.NoError(b, err, "marshaling object")
|
||||
require.NoError(b, data.Truncate(int64((len(rawData)+4)*objectCount)))
|
||||
b.StartTimer()
|
||||
|
||||
for range objectCount {
|
||||
obj := objGen.Next()
|
||||
rawData, err := obj.Marshal()
|
||||
require.NoError(b, err, "marshaling object")
|
||||
|
||||
objectStruct := make([]byte, 4+len(rawData))
|
||||
binary.LittleEndian.PutUint32(objectStruct, uint32(len(rawData)))
|
||||
copy(objectStruct[4:], rawData)
|
||||
c, err := data.WriteAt(objectStruct, int64(dataOffset))
|
||||
require.NoError(b, err)
|
||||
require.True(b, c == len(objectStruct))
|
||||
require.NoError(b, os.Rename(
|
||||
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset)),
|
||||
filepath.Join(dir, fmt.Sprintf("%s_%d", filePrefix, dataOffset+uint32(len(objectStruct)))),
|
||||
))
|
||||
dataOffset += uint32(len(objectStruct))
|
||||
}
|
||||
|
||||
require.NoError(b, data.Close())
|
||||
}
|
||||
})
|
||||
|
||||
b.Run(fmt.Sprintf("fstree %d", size), func(b *testing.B) {
|
||||
cache := newCache(b)
|
||||
benchmarkPutPrepare(b, cache)
|
||||
defer func() { require.NoError(b, cache.Close(context.Background())) }()
|
||||
|
||||
b.ResetTimer()
|
||||
for range b.N {
|
||||
for range objectCount {
|
||||
obj := objGen.Next()
|
||||
rawData, err := obj.Marshal()
|
||||
require.NoError(b, err, "marshaling object")
|
||||
prm := common.PutPrm{
|
||||
Address: testutil.AddressFromObject(b, obj),
|
||||
Object: obj,
|
||||
RawData: rawData,
|
||||
}
|
||||
if _, err := cache.Put(context.Background(), prm); err != nil {
|
||||
b.Fatalf("putting: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWritecacheSeq(b *testing.B) {
|
||||
const payloadSize = 8 << 10
|
||||
b.Run("bbolt_seq", func(b *testing.B) {
|
||||
|
|
35
pkg/local_object_storage/writecache/slabstore/config.go
Normal file
35
pkg/local_object_storage/writecache/slabstore/config.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package slabstore
|
||||
|
||||
var defaultConfig config = config{
|
||||
sealSize: 4 << 20,
|
||||
truncateSize: 5 << 20,
|
||||
maxActiveCount: 10,
|
||||
}
|
||||
|
||||
type config struct {
|
||||
path string
|
||||
readOnly bool
|
||||
sealSize uint64
|
||||
truncateSize uint64
|
||||
maxActiveCount uint32
|
||||
}
|
||||
|
||||
type Option func(c *config)
|
||||
|
||||
func WithSealSize(size uint64) Option {
|
||||
return func(c *config) {
|
||||
c.sealSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithTruncateSize(size uint64) Option {
|
||||
return func(c *config) {
|
||||
c.truncateSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxActiveSlabsCount(count uint32) Option {
|
||||
return func(c *config) {
|
||||
c.maxActiveCount = count
|
||||
}
|
||||
}
|
153
pkg/local_object_storage/writecache/slabstore/dispatch.go
Normal file
153
pkg/local_object_storage/writecache/slabstore/dispatch.go
Normal file
|
@ -0,0 +1,153 @@
|
|||
package slabstore
|
||||
|
||||
import (
|
||||
"slices"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func (s *Store) dispatch() {
|
||||
s.wg.Done()
|
||||
for {
|
||||
slab := s.nextActive()
|
||||
var writeReady chan *Slab
|
||||
if slab != nil {
|
||||
writeReady = s.writeReady
|
||||
}
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
return
|
||||
case slab := <-s.writeCompleted:
|
||||
assert.True(slab.writers == 1, "slab writers count must be equal 1")
|
||||
slab.writers--
|
||||
if s.slabSealed(slab) {
|
||||
s.sealed = append(s.sealed, slab)
|
||||
} else {
|
||||
s.active = append(s.active, slab)
|
||||
}
|
||||
if stop := s.dropIfNeeded(slab); stop {
|
||||
return
|
||||
}
|
||||
case req := <-s.readReq:
|
||||
var resp readResponse
|
||||
if slab, ok := s.all[req.id]; ok {
|
||||
slab.readers++
|
||||
resp.slab = slab
|
||||
} else {
|
||||
resp.err = ErrSlabNotFound
|
||||
}
|
||||
select {
|
||||
case req.result <- &resp:
|
||||
case <-s.closed:
|
||||
return
|
||||
}
|
||||
case slab := <-s.readCompleted:
|
||||
assert.True(slab.readers > 0, "slab readers count must be greater than zero")
|
||||
slab.readers--
|
||||
if stop := s.dropIfNeeded(slab); stop {
|
||||
return
|
||||
}
|
||||
case req := <-s.sealedReq:
|
||||
s.active = slices.DeleteFunc(s.active, func(slab *Slab) bool {
|
||||
if s.slabSealed(slab) {
|
||||
s.sealed = append(s.sealed, slab)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
select {
|
||||
case req.result <- &sealedResponse{
|
||||
slabs: s.sealed,
|
||||
}:
|
||||
case <-s.closed:
|
||||
return
|
||||
}
|
||||
case req := <-s.dropReq:
|
||||
slab, ok := s.all[req.id]
|
||||
if !ok {
|
||||
select {
|
||||
case req.result <- &dropResponse{err: ErrSlabNotFound}:
|
||||
continue
|
||||
case <-s.closed:
|
||||
return
|
||||
}
|
||||
}
|
||||
s.active = slices.DeleteFunc(s.active, func(slab *Slab) bool {
|
||||
return slab.ID() == req.id
|
||||
})
|
||||
s.sealed = slices.DeleteFunc(s.sealed, func(slab *Slab) bool {
|
||||
return slab.ID() == req.id
|
||||
})
|
||||
if slab.readers == 0 && slab.writers == 0 {
|
||||
delete(s.all, req.id)
|
||||
select {
|
||||
case req.result <- &dropResponse{slab: slab}:
|
||||
continue
|
||||
case <-s.closed:
|
||||
return
|
||||
}
|
||||
}
|
||||
s.toDrop[req.id] = append(s.toDrop[req.id], req.result)
|
||||
case writeReady <- slab:
|
||||
slab.writers++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) dropIfNeeded(slab *Slab) bool {
|
||||
if slab.readers > 0 || slab.writers > 0 {
|
||||
return false
|
||||
}
|
||||
waiters, ok := s.toDrop[slab.ID()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
for _, w := range waiters {
|
||||
select {
|
||||
case w <- &dropResponse{slab: slab}:
|
||||
case <-s.closed:
|
||||
return true
|
||||
}
|
||||
}
|
||||
delete(s.toDrop, slab.ID())
|
||||
s.active = slices.DeleteFunc(s.active, func(sl *Slab) bool {
|
||||
return slab.ID() == sl.ID()
|
||||
})
|
||||
s.sealed = slices.DeleteFunc(s.sealed, func(sl *Slab) bool {
|
||||
return slab.ID() == sl.ID()
|
||||
})
|
||||
delete(s.all, slab.ID())
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Store) nextActive() *Slab {
|
||||
if len(s.active) > 0 {
|
||||
result := s.active[len(s.active)-1]
|
||||
s.active = s.active[:len(s.active)-1]
|
||||
return result
|
||||
}
|
||||
if uint32(len(s.all))-uint32(len(s.sealed)) < s.config.maxActiveCount {
|
||||
return s.createNewSlab()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) createNewSlab() *Slab {
|
||||
slab := &Slab{}
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
// TODO: log
|
||||
return nil
|
||||
}
|
||||
slab.id = SlabID(id)
|
||||
slab.dir = s.config.path
|
||||
slab.truncateSize = s.config.truncateSize
|
||||
s.all[slab.id] = slab
|
||||
return slab
|
||||
}
|
||||
|
||||
func (s *Store) slabSealed(slab *Slab) bool {
|
||||
return slab.commited() >= s.config.sealSize
|
||||
}
|
15
pkg/local_object_storage/writecache/slabstore/errors.go
Normal file
15
pkg/local_object_storage/writecache/slabstore/errors.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package slabstore
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrClosed = errors.New("slabstore is closed")
|
||||
ErrNotInitialized = errors.New("slabstore is not initialized")
|
||||
ErrReadOnly = errors.New("slabstore is in read-only mode")
|
||||
ErrSlabNotFound = errors.New("slab not found")
|
||||
ErrInvalidOffsetLength = errors.New("invalid offset/length for slab")
|
||||
ErrIncompleteWrite = errors.New("incomplete write to slab")
|
||||
ErrIncompleteRead = errors.New("incomplete read from slab")
|
||||
ErrPurged = errors.New("slab already purged")
|
||||
ErrInvalidSlabName = errors.New("invalid slab name")
|
||||
)
|
180
pkg/local_object_storage/writecache/slabstore/slab.go
Normal file
180
pkg/local_object_storage/writecache/slabstore/slab.go
Normal file
|
@ -0,0 +1,180 @@
|
|||
package slabstore
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type Slab struct {
|
||||
mtx sync.RWMutex
|
||||
dir string
|
||||
id SlabID
|
||||
truncateSize uint64
|
||||
|
||||
readers int64
|
||||
writers int64
|
||||
|
||||
commitedBytes uint64
|
||||
truncated bool
|
||||
purged bool
|
||||
}
|
||||
|
||||
func (s *Slab) ID() SlabID {
|
||||
return s.id
|
||||
}
|
||||
|
||||
func (s *Slab) Write(data []byte) (OffsetLength, error) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if s.purged {
|
||||
return OffsetLength{}, ErrPurged
|
||||
}
|
||||
|
||||
file, err := s.openFileWrite()
|
||||
if err != nil {
|
||||
return OffsetLength{}, err
|
||||
}
|
||||
|
||||
n, err := file.WriteAt(data, int64(s.commitedBytes))
|
||||
if err != nil {
|
||||
closeErr := file.Close()
|
||||
return OffsetLength{}, fmt.Errorf("write data: %w", errors.Join(err, closeErr))
|
||||
}
|
||||
if n != len(data) {
|
||||
closeErr := file.Close()
|
||||
return OffsetLength{}, fmt.Errorf("write data: %w", errors.Join(ErrIncompleteWrite, closeErr))
|
||||
}
|
||||
|
||||
if err := file.Close(); err != nil {
|
||||
return OffsetLength{}, fmt.Errorf("close file: %w", err)
|
||||
}
|
||||
|
||||
nextCommitedBytes := s.commitedBytes + uint64(len(data))
|
||||
|
||||
if err := os.Rename(s.currentFilename(), s.filename(nextCommitedBytes)); err != nil {
|
||||
return OffsetLength{}, fmt.Errorf("rename slab: %w", err)
|
||||
}
|
||||
result := OffsetLength{
|
||||
Offset: s.commitedBytes,
|
||||
Length: uint64(len(data)),
|
||||
}
|
||||
s.commitedBytes = nextCommitedBytes
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Slab) commited() uint64 {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
return s.commitedBytes
|
||||
}
|
||||
|
||||
func (s *Slab) currentFilename() string {
|
||||
return s.filename(s.commitedBytes)
|
||||
}
|
||||
|
||||
func (s *Slab) filename(commitedBytes uint64) string {
|
||||
return filepath.Join(s.dir, fmt.Sprintf("%s_%d.bin", s.ID().String(), commitedBytes))
|
||||
}
|
||||
|
||||
func (s *Slab) openFileWrite() (*os.File, error) {
|
||||
flags := os.O_CREATE | os.O_SYNC | os.O_WRONLY
|
||||
f, err := os.OpenFile(
|
||||
s.currentFilename(),
|
||||
flags,
|
||||
0o700,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open file: %w", err)
|
||||
}
|
||||
if !s.truncated {
|
||||
if err := f.Truncate(int64(s.truncateSize)); err != nil {
|
||||
closeErr := f.Close()
|
||||
return nil, fmt.Errorf("truncate file: %w", errors.Join(err, closeErr))
|
||||
}
|
||||
s.truncated = true
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (s *Slab) Read(from OffsetLength) ([]byte, error) {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
if s.purged {
|
||||
return nil, ErrPurged
|
||||
}
|
||||
|
||||
if from.Offset+from.Length > s.commitedBytes {
|
||||
return nil, ErrInvalidOffsetLength
|
||||
}
|
||||
|
||||
file, err := s.openFileRead()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := make([]byte, from.Length)
|
||||
n, err := file.ReadAt(buf, int64(from.Offset))
|
||||
if err != nil {
|
||||
closeErr := file.Close()
|
||||
return nil, fmt.Errorf("read data: %w", errors.Join(err, closeErr))
|
||||
}
|
||||
if n != len(buf) {
|
||||
closeErr := file.Close()
|
||||
return nil, fmt.Errorf("read data: %w", errors.Join(ErrIncompleteRead, closeErr))
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return nil, fmt.Errorf("close file: %w", err)
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (s *Slab) openFileRead() (*os.File, error) {
|
||||
f, err := os.Open(s.currentFilename())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open file: %w", err)
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (s *Slab) PurgeData() error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if s.purged {
|
||||
return ErrPurged
|
||||
}
|
||||
|
||||
if err := os.Remove(s.currentFilename()); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("purge data: %w", err)
|
||||
}
|
||||
|
||||
s.commitedBytes = 0
|
||||
s.purged = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseSlabName(filename string) (SlabID, uint64, error) {
|
||||
splitted := strings.Split(filename, "_")
|
||||
if len(splitted) != 2 {
|
||||
return SlabID{}, 0, ErrInvalidSlabName
|
||||
}
|
||||
id, err := uuid.Parse(splitted[0])
|
||||
if err != nil {
|
||||
return SlabID{}, 0, fmt.Errorf("invalid slab name: parse ID: %w", err)
|
||||
}
|
||||
commited, err := strconv.ParseUint(strings.TrimRight(splitted[1], ".bin"), 10, 64)
|
||||
if err != nil {
|
||||
return SlabID{}, 0, fmt.Errorf("invalid slab name: parse commited: %w", err)
|
||||
}
|
||||
return SlabID(id), commited, nil
|
||||
}
|
3
pkg/local_object_storage/writecache/slabstore/stat.go
Normal file
3
pkg/local_object_storage/writecache/slabstore/stat.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package slabstore
|
||||
|
||||
type stat struct{}
|
248
pkg/local_object_storage/writecache/slabstore/store.go
Normal file
248
pkg/local_object_storage/writecache/slabstore/store.go
Normal file
|
@ -0,0 +1,248 @@
|
|||
package slabstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
config config
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
initialized atomic.Bool
|
||||
|
||||
writeReady chan *Slab
|
||||
writeCompleted chan *Slab
|
||||
readReq chan *readRequest
|
||||
readCompleted chan *Slab
|
||||
sealedReq chan *sealedRequest
|
||||
dropReq chan *dropRequest
|
||||
closed chan struct{}
|
||||
|
||||
active []*Slab
|
||||
sealed []*Slab
|
||||
all map[SlabID]*Slab
|
||||
toDrop map[SlabID][]chan *dropResponse
|
||||
}
|
||||
|
||||
func NewStore(path string, readOnly bool, opts ...Option) *Store {
|
||||
config := defaultConfig
|
||||
config.path = path
|
||||
config.readOnly = readOnly
|
||||
for _, o := range opts {
|
||||
o(&config)
|
||||
}
|
||||
|
||||
return &Store{
|
||||
config: config,
|
||||
writeReady: make(chan *Slab),
|
||||
writeCompleted: make(chan *Slab),
|
||||
readReq: make(chan *readRequest),
|
||||
readCompleted: make(chan *Slab),
|
||||
sealedReq: make(chan *sealedRequest),
|
||||
dropReq: make(chan *dropRequest),
|
||||
closed: make(chan struct{}),
|
||||
active: make([]*Slab, 0, config.maxActiveCount),
|
||||
sealed: make([]*Slab, 0),
|
||||
all: make(map[SlabID]*Slab),
|
||||
toDrop: make(map[SlabID][]chan *dropResponse),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Init(ctx context.Context, reader SlabReader) error {
|
||||
if s.initialized.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := util.MkdirAllX(s.config.path, os.ModePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(s.config.path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read existing slabs: %w", err)
|
||||
}
|
||||
|
||||
for _, ent := range entries {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if ent.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
id, commited, err := parseSlabName(ent.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slab := &Slab{
|
||||
dir: s.config.path,
|
||||
id: id,
|
||||
truncateSize: s.config.truncateSize,
|
||||
commitedBytes: commited,
|
||||
}
|
||||
|
||||
if reader != nil {
|
||||
if err := reader.Read(ctx, slab); err != nil {
|
||||
return fmt.Errorf("read slab: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.all[slab.ID()] = slab
|
||||
|
||||
if s.slabSealed(slab) {
|
||||
s.sealed = append(s.sealed, slab)
|
||||
} else {
|
||||
s.active = append(s.active, slab)
|
||||
}
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.dispatch()
|
||||
s.initialized.Store(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) Write(ctx context.Context) (WriteSlab, ReleaseFunc, error) {
|
||||
if !s.initialized.Load() {
|
||||
return nil, nil, ErrNotInitialized
|
||||
}
|
||||
if s.config.readOnly {
|
||||
return nil, nil, ErrReadOnly
|
||||
}
|
||||
|
||||
select {
|
||||
case slab := <-s.writeReady:
|
||||
return slab, func() {
|
||||
select {
|
||||
case s.writeCompleted <- slab:
|
||||
case <-s.closed:
|
||||
}
|
||||
}, nil
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, nil, ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Read(ctx context.Context, id SlabID) (ReadSlab, ReleaseFunc, error) {
|
||||
if !s.initialized.Load() {
|
||||
return nil, nil, ErrNotInitialized
|
||||
}
|
||||
|
||||
req := &readRequest{
|
||||
id: id,
|
||||
result: make(chan *readResponse),
|
||||
}
|
||||
select {
|
||||
case s.readReq <- req:
|
||||
select {
|
||||
case resp := <-req.result:
|
||||
if resp.err != nil {
|
||||
return nil, nil, resp.err
|
||||
}
|
||||
return resp.slab, func() {
|
||||
select {
|
||||
case s.readCompleted <- resp.slab:
|
||||
case <-s.closed:
|
||||
}
|
||||
}, nil
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, nil, ErrClosed
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, nil, ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Sealed(ctx context.Context) ([]ReadSlab, error) {
|
||||
if !s.initialized.Load() {
|
||||
return nil, ErrNotInitialized
|
||||
}
|
||||
|
||||
req := &sealedRequest{
|
||||
result: make(chan *sealedResponse),
|
||||
}
|
||||
select {
|
||||
case s.sealedReq <- req:
|
||||
select {
|
||||
case resp := <-req.result:
|
||||
if resp.err != nil {
|
||||
return nil, resp.err
|
||||
}
|
||||
result := make([]ReadSlab, 0, len(resp.slabs))
|
||||
for _, slab := range resp.slabs {
|
||||
result = append(result, slab)
|
||||
}
|
||||
return result, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, ErrClosed
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Purge(ctx context.Context, id SlabID) error {
|
||||
slab, err := s.drop(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return slab.PurgeData()
|
||||
}
|
||||
|
||||
func (s *Store) drop(ctx context.Context, id SlabID) (*Slab, error) {
|
||||
if !s.initialized.Load() {
|
||||
return nil, ErrNotInitialized
|
||||
}
|
||||
|
||||
req := &dropRequest{
|
||||
id: id,
|
||||
result: make(chan *dropResponse),
|
||||
}
|
||||
select {
|
||||
case s.dropReq <- req:
|
||||
select {
|
||||
case resp := <-req.result:
|
||||
return resp.slab, resp.err
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, ErrClosed
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-s.closed:
|
||||
return nil, ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Close() {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return
|
||||
default:
|
||||
close(s.closed)
|
||||
s.wg.Wait()
|
||||
s.initialized.Store(false)
|
||||
}
|
||||
}
|
67
pkg/local_object_storage/writecache/slabstore/types.go
Normal file
67
pkg/local_object_storage/writecache/slabstore/types.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package slabstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type SlabID uuid.UUID
|
||||
|
||||
func (id SlabID) String() string {
|
||||
return uuid.UUID(id).String()
|
||||
}
|
||||
|
||||
type OffsetLength struct {
|
||||
Offset uint64
|
||||
Length uint64
|
||||
}
|
||||
|
||||
type ReleaseFunc func()
|
||||
|
||||
type SlabWithID interface {
|
||||
ID() SlabID
|
||||
}
|
||||
|
||||
type ReadSlab interface {
|
||||
SlabWithID
|
||||
Read(from OffsetLength) ([]byte, error)
|
||||
}
|
||||
|
||||
type WriteSlab interface {
|
||||
SlabWithID
|
||||
Write(data []byte) (OffsetLength, error)
|
||||
}
|
||||
|
||||
type readRequest struct {
|
||||
id SlabID
|
||||
result chan *readResponse
|
||||
}
|
||||
|
||||
type readResponse struct {
|
||||
slab *Slab
|
||||
err error
|
||||
}
|
||||
|
||||
type sealedRequest struct {
|
||||
result chan *sealedResponse
|
||||
}
|
||||
|
||||
type sealedResponse struct {
|
||||
slabs []*Slab
|
||||
err error
|
||||
}
|
||||
|
||||
type dropRequest struct {
|
||||
id SlabID
|
||||
result chan *dropResponse
|
||||
}
|
||||
|
||||
type dropResponse struct {
|
||||
slab *Slab
|
||||
err error
|
||||
}
|
||||
|
||||
type SlabReader interface {
|
||||
Read(ctx context.Context, slab *Slab) error
|
||||
}
|
Loading…
Add table
Reference in a new issue