forked from TrueCloudLab/frostfs-node
uringstor: fixes
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
7c55823577
commit
884b77211c
5 changed files with 82 additions and 30 deletions
|
@ -1,6 +1,7 @@
|
|||
package uringstor
|
||||
|
||||
import (
|
||||
"math/bits"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -20,11 +21,13 @@ type nameIDPair struct {
|
|||
func (s *Storage) Open(readOnly bool) error {
|
||||
s.readOnly = readOnly
|
||||
|
||||
lp, err := loop.New(s.loopSize, s.loopParams)
|
||||
if err != nil {
|
||||
return err
|
||||
if s.loopSize != 0 {
|
||||
lp, err := loop.New(s.loopSize, &s.loopParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.loop = lp
|
||||
}
|
||||
s.loop = lp
|
||||
|
||||
if !readOnly {
|
||||
if err := util.MkdirAllX(s.path, os.ModePerm); err != nil {
|
||||
|
@ -54,17 +57,14 @@ func (s *Storage) Open(readOnly bool) error {
|
|||
|
||||
maxID := -1
|
||||
for i := range ids {
|
||||
b := slab.NewUringBackend(filepath.Join(s.path, ids[i].name), s.loop)
|
||||
sl, err := slab.New(slab.Params{
|
||||
Backend: b,
|
||||
ReadOnly: s.readOnly,
|
||||
})
|
||||
slabPath := filepath.Join(s.path, ids[i].name)
|
||||
sl, err := s.openSlab(slabPath, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slabID := ids[i].id
|
||||
index := sl.ChunkSize() / uint32(s.minObjectSize)
|
||||
index := bits.TrailingZeros32(sl.ChunkSize()) - bits.TrailingZeros32(uint32(s.minObjectSize))
|
||||
s.slabIDs[index] = append(s.slabIDs[index], slabID)
|
||||
s.slabMap[slabID] = sl
|
||||
|
||||
|
@ -76,6 +76,21 @@ func (s *Storage) Open(readOnly bool) error {
|
|||
if maxID > 0 {
|
||||
s.nextID = uint16(maxID + 1)
|
||||
}
|
||||
for i := range s.slabIDs {
|
||||
if len(s.slabIDs[i]) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
slabPath := filepath.Join(s.path, strconv.FormatUint(uint64(s.nextID), idToStringBase))
|
||||
ss, err := s.openSlab(slabPath, uint32(s.minObjectSize<<i), s.slabCapacity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.slabIDs[i] = append(s.slabIDs[i], s.nextID)
|
||||
s.slabMap[s.nextID] = ss
|
||||
s.nextID++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -96,7 +111,9 @@ func (s *Storage) Close() error {
|
|||
s.slabIDs[i] = nil
|
||||
}
|
||||
|
||||
s.loop.Close()
|
||||
s.loop = nil
|
||||
if s.loop != nil {
|
||||
s.loop.Close()
|
||||
s.loop = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"math/bits"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/uringstor/slab"
|
||||
|
@ -34,13 +33,15 @@ func (s *Storage) Put(prm common.PutPrm) (common.PutRes, error) {
|
|||
assert(ok, "slab ID is here, but the slab is missing: %d", slabID)
|
||||
s.slabMtx.RUnlock()
|
||||
|
||||
//start := time.Now()
|
||||
offset, err := ss.Put(prm.RawData)
|
||||
//fmt.Println(time.Since(start))
|
||||
if err == nil {
|
||||
storageID := marshalStorageID(slabID, offset)
|
||||
storagelog.Write(s.log,
|
||||
storagelog.OpField("PUT"),
|
||||
storagelog.StorageIDField(storageID[:]),
|
||||
storagelog.AddressField(prm.Address))
|
||||
// storagelog.Write(s.log,
|
||||
// storagelog.OpField("PUT"),
|
||||
// storagelog.StorageIDField(storageID[:]),
|
||||
// storagelog.AddressField(prm.Address))
|
||||
return common.PutRes{StorageID: storageID[:]}, nil
|
||||
}
|
||||
s.log.Error("can't put object to a slab", zap.Error(err))
|
||||
|
@ -58,21 +59,11 @@ func (s *Storage) Put(prm common.PutPrm) (common.PutRes, error) {
|
|||
slabID = s.slabIDs[bucket][ln]
|
||||
ss = s.slabMap[slabID]
|
||||
} else {
|
||||
slabPath := filepath.Join(s.path, strconv.FormatUint(uint64(s.nextID), idToStringBase))
|
||||
b := slab.NewUringBackend(slabPath, s.loop)
|
||||
|
||||
var err error
|
||||
ss, err = slab.New(slab.Params{
|
||||
Backend: b,
|
||||
ChunkSize: uint32(up2),
|
||||
Capacity: s.slabCapacity,
|
||||
ReadOnly: s.readOnly,
|
||||
})
|
||||
slabPath := filepath.Join(s.path, strconv.FormatUint(uint64(s.nextID), idToStringBase))
|
||||
ss, err = s.openSlab(slabPath, uint32(up2), s.slabCapacity)
|
||||
if err != nil {
|
||||
s.slabMtx.Unlock()
|
||||
if err == syscall.ENOSPC {
|
||||
err = common.ErrNoSpace
|
||||
}
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,11 @@ func (f *UringFile) Open(flags int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
f.fd = f.File.file.Fd()
|
||||
fd := f.File.file.Fd()
|
||||
//if err := f.loop.RegisterFiles([]int32{int32(fd)}); err != nil {
|
||||
// return err
|
||||
//}
|
||||
f.fd = fd
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -104,6 +108,7 @@ func (f *UringFile) Close() error {
|
|||
func (f *UringFile) Fallocate(size int64) error {
|
||||
var sqe uring.SQEntry
|
||||
uring.Fallocate(&sqe, f.fd, _FALLOC_FL_ZERO_RANGE, 0, uint64(size))
|
||||
sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
|
||||
_, err := f.loop.Complete(sqe)
|
||||
return err
|
||||
|
@ -112,6 +117,7 @@ func (f *UringFile) Fallocate(size int64) error {
|
|||
func (f *UringFile) WriteAt(buf []byte, offset int64) error {
|
||||
var sqe uring.SQEntry
|
||||
uring.WriteAt(&sqe, f.fd, buf, uint64(offset))
|
||||
sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
|
||||
_, err := f.loop.Complete(sqe)
|
||||
return err
|
||||
|
@ -120,6 +126,7 @@ func (f *UringFile) WriteAt(buf []byte, offset int64) error {
|
|||
func (f *UringFile) ReadAt(buf []byte, offset int64) error {
|
||||
var sqe uring.SQEntry
|
||||
uring.ReadAt(&sqe, f.fd, buf, uint64(offset))
|
||||
sqe.SetFlags(uring.IOSQE_FIXED_FILE)
|
||||
|
||||
_, err := f.loop.Complete(sqe)
|
||||
return err
|
||||
|
|
|
@ -21,6 +21,7 @@ type Storage struct {
|
|||
path string
|
||||
maxObjectSize uint64
|
||||
minObjectSize uint64
|
||||
noSync bool
|
||||
readOnly bool
|
||||
loopSize int
|
||||
loopParams loop.Params
|
||||
|
@ -51,6 +52,7 @@ func New(opts ...Option) (*Storage, error) {
|
|||
s.maxObjectSize = defaultMaxObjectSize
|
||||
s.minObjectSize = defaultMinObjectSize
|
||||
s.loopSize = defaultUringLoopSize
|
||||
s.log = &logger.Logger{Logger: zap.NewNop()}
|
||||
for i := range opts {
|
||||
opts[i](&s)
|
||||
}
|
||||
|
@ -102,6 +104,12 @@ func WithLoopParams(size int, p loop.Params) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithOSBackend() Option {
|
||||
return func(s *Storage) {
|
||||
s.loopSize = 0
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxObjectSize(size int64) Option {
|
||||
return func(s *Storage) {
|
||||
s.maxObjectSize = uint64(size)
|
||||
|
@ -113,3 +121,9 @@ func WithLogger(l *logger.Logger) Option {
|
|||
s.log = &logger.Logger{Logger: l.With(zap.String("component", Type))}
|
||||
}
|
||||
}
|
||||
|
||||
func WithNoSync(noSync bool) Option {
|
||||
return func(s *Storage) {
|
||||
s.noSync = noSync
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/uringstor/slab"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
@ -32,3 +33,25 @@ func assert(ok bool, msg string, args ...any) {
|
|||
panic(fmt.Sprintf(msg, args...))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) openSlab(p string, chunkSize uint32, slabCapacity uint64) (*slab.Slab, error) {
|
||||
var b slab.Backend
|
||||
if s.loop == nil {
|
||||
b = slab.NewFileBackend(p)
|
||||
} else {
|
||||
b = slab.NewUringBackend(p, s.loop)
|
||||
}
|
||||
|
||||
ss, err := slab.New(slab.Params{
|
||||
Backend: b,
|
||||
ChunkSize: chunkSize,
|
||||
Capacity: slabCapacity,
|
||||
ReadOnly: s.readOnly,
|
||||
NoSync: s.noSync,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue