[#81] node: Add basic read/write benchmarks for substorages

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-03-03 12:33:08 +03:00 committed by Gitea
parent b1c165a93b
commit 724debfdcd
7 changed files with 697 additions and 1 deletions

3
go.mod
View file

@ -39,6 +39,8 @@ require (
gopkg.in/yaml.v3 v3.0.1
)
require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
@ -91,7 +93,6 @@ require (
github.com/urfave/cli v1.22.5 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221227203929-1b447090c38c // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -0,0 +1,15 @@
package memstore
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
func (s *memstoreImpl) Open(readOnly bool) error {
s.readOnly = readOnly
return nil
}
func (s *memstoreImpl) Init() error { return nil }
func (s *memstoreImpl) Close() error { return nil }
func (s *memstoreImpl) Type() string { return Type }
func (s *memstoreImpl) Path() string { return s.rootPath }
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *memstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }

View file

@ -0,0 +1,168 @@
// Package memstore implements a memory-backed common.Storage for testing purposes.
package memstore
import (
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
const Type = "memstore"
type memstoreImpl struct {
*cfg
mu sync.RWMutex
objs map[string][]byte
}
func New(opts ...Option) common.Storage {
st := &memstoreImpl{
cfg: defaultConfig(),
objs: map[string][]byte{},
}
for _, opt := range opts {
opt(st.cfg)
}
return st
}
func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()
data, exists := s.objs[key]
s.mu.RUnlock()
if !exists {
return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
// Decompress the data.
var err error
if data, err = s.compression.Decompress(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// Unmarshal the SDK object.
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return common.GetRes{Object: obj, RawData: data}, nil
}
func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(common.GetPrm{
Address: req.Address,
StorageID: req.StorageID,
})
if err != nil {
return common.GetRangeRes{}, err
}
payload := getResp.Object.Payload()
from := req.Range.GetOffset()
to := from + req.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{})
}
return common.GetRangeRes{
Data: payload[from:to],
}, nil
}
func (s *memstoreImpl) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()
defer s.mu.RUnlock()
_, exists := s.objs[key]
return common.ExistsRes{Exists: exists}, nil
}
func (s *memstoreImpl) Put(req common.PutPrm) (common.PutRes, error) {
if s.readOnly {
return common.PutRes{}, common.ErrReadOnly
}
if !req.DontCompress {
req.RawData = s.compression.Compress(req.RawData)
}
key := req.Address.EncodeToString()
s.mu.Lock()
defer s.mu.Unlock()
s.objs[key] = req.RawData
return common.PutRes{StorageID: []byte(s.rootPath)}, nil
}
func (s *memstoreImpl) Delete(req common.DeletePrm) (common.DeleteRes, error) {
if s.readOnly {
return common.DeleteRes{}, common.ErrReadOnly
}
key := req.Address.EncodeToString()
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.objs[key]; exists {
delete(s.objs, key)
return common.DeleteRes{}, nil
}
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
func (s *memstoreImpl) Iterate(req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for k, v := range s.objs {
elem := common.IterationElement{
ObjectData: v,
}
if err := elem.Address.DecodeString(string(k)); err != nil {
if req.IgnoreErrors {
continue
}
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err))
}
var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
if req.IgnoreErrors {
if req.ErrorHandler != nil {
return common.IterateRes{}, req.ErrorHandler(elem.Address, err)
}
continue
}
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
}
switch {
case req.Handler != nil:
if err := req.Handler(elem); err != nil {
return common.IterateRes{}, err
}
case req.LazyHandler != nil:
if err := req.LazyHandler(elem.Address, func() ([]byte, error) { return elem.ObjectData, nil }); err != nil {
return common.IterateRes{}, err
}
default:
if !req.IgnoreErrors {
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))
}
}
}
return common.IterateRes{}, nil
}

View file

@ -0,0 +1,68 @@
package memstore
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestSimpleLifecycle(t *testing.T) {
s := New(
WithRootPath("memstore"),
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
)
t.Cleanup(func() { _ = s.Close() })
require.NoError(t, s.Open(false))
require.NoError(t, s.Init())
obj := blobstortest.NewObject(1024)
addr := object.AddressOf(obj)
d, err := obj.Marshal()
require.NoError(t, err)
{
_, err := s.Put(common.PutPrm{Address: addr, RawData: d, DontCompress: true})
require.NoError(t, err)
}
{
resp, err := s.Exists(common.ExistsPrm{Address: addr})
require.NoError(t, err)
require.True(t, resp.Exists)
}
{
resp, err := s.Get(common.GetPrm{Address: addr})
require.NoError(t, err)
require.Equal(t, obj.Payload(), resp.Object.Payload())
}
{
var objRange objectSDK.Range
objRange.SetOffset(256)
objRange.SetLength(512)
resp, err := s.GetRange(common.GetRangePrm{
Address: addr,
Range: objRange,
})
require.NoError(t, err)
require.Equal(t, obj.Payload()[objRange.GetOffset():objRange.GetOffset()+objRange.GetLength()], resp.Data)
}
{
_, err := s.Delete(common.DeletePrm{Address: addr})
require.NoError(t, err)
}
{
resp, err := s.Exists(common.ExistsPrm{Address: addr})
require.NoError(t, err)
require.False(t, resp.Exists)
}
}

View file

@ -0,0 +1,42 @@
package memstore
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
type cfg struct {
log *logger.Logger
rootPath string
readOnly bool
compression *compression.Config
reportError func(string, error)
}
func defaultConfig() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
reportError: func(string, error) {},
}
}
type Option func(*cfg)
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithRootPath(p string) Option {
return func(c *cfg) {
c.rootPath = p
}
}
func WithReadOnly(ro bool) Option {
return func(c *cfg) {
c.readOnly = ro
}
}

View file

@ -0,0 +1,402 @@
package blobstor
import (
"encoding/binary"
"fmt"
"os"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/exp/rand"
"golang.org/x/exp/slices"
)
// The storages to benchmark. Each storage has a description and a function which returns the actual
// storage along with a cleanup function.
var storages = []struct {
desc string
create func(*testing.B) (common.Storage, func())
}{
{
desc: "memstore",
create: func(*testing.B) (common.Storage, func()) {
return memstore.New(), func() {}
},
},
{
desc: "fstree_nosync",
create: func(b *testing.B) (common.Storage, func()) {
dir, err := os.MkdirTemp(os.TempDir(), "fstree_nosync")
if err != nil {
b.Fatalf("creating fstree_nosync root path: %v", err)
}
cleanup := func() { os.RemoveAll(dir) }
return fstree.New(
fstree.WithPath(dir),
fstree.WithDepth(2),
fstree.WithDirNameLen(2),
fstree.WithNoSync(true),
), cleanup
},
},
{
desc: "fstree",
create: func(b *testing.B) (common.Storage, func()) {
dir, err := os.MkdirTemp(os.TempDir(), "fstree")
if err != nil {
b.Fatalf("creating fstree root path: %v", err)
}
cleanup := func() { os.RemoveAll(dir) }
return fstree.New(
fstree.WithPath(dir),
fstree.WithDepth(2),
fstree.WithDirNameLen(2),
), cleanup
},
},
{
desc: "blobovniczatree",
create: func(b *testing.B) (common.Storage, func()) {
dir, err := os.MkdirTemp(os.TempDir(), "blobovniczatree")
if err != nil {
b.Fatalf("creating blobovniczatree root path: %v", err)
}
cleanup := func() { os.RemoveAll(dir) }
return blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(dir),
), cleanup
},
},
}
func BenchmarkSubstorageReadPerf(b *testing.B) {
readTests := []struct {
desc string
size int
objGen func() objectGenerator
addrGen func() addressGenerator
}{
{
desc: "seq100",
size: 10000,
objGen: func() objectGenerator { return &seqObjGenerator{objSize: 100} },
addrGen: func() addressGenerator { return &seqAddrGenerator{maxID: 100} },
},
{
desc: "rand100",
size: 10000,
objGen: func() objectGenerator { return &seqObjGenerator{objSize: 100} },
addrGen: func() addressGenerator { return randAddrGenerator(10000) },
},
}
for _, tt := range readTests {
for _, stEntry := range storages {
b.Run(fmt.Sprintf("%s-%s", stEntry.desc, tt.desc), func(b *testing.B) {
objGen := tt.objGen()
st, cleanup := stEntry.create(b)
require.NoError(b, st.Open(false))
require.NoError(b, st.Init())
// Fill database
for i := 0; i < tt.size; i++ {
obj := objGen.Next()
addr := addressFromObject(obj)
raw, err := obj.Marshal()
require.NoError(b, err)
if _, err := st.Put(common.PutPrm{
Address: addr,
RawData: raw,
}); err != nil {
b.Fatalf("writing entry: %v", err)
}
}
// Benchmark reading
addrGen := tt.addrGen()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := st.Get(common.GetPrm{Address: addrGen.Next()})
require.NoError(b, err)
}
})
require.NoError(b, st.Close())
cleanup()
})
}
}
}
func BenchmarkSubstorageWritePerf(b *testing.B) {
generators := []struct {
desc string
create func() objectGenerator
}{
{desc: "rand10", create: func() objectGenerator { return &randObjGenerator{objSize: 10} }},
{desc: "rand100", create: func() objectGenerator { return &randObjGenerator{objSize: 100} }},
{desc: "rand1000", create: func() objectGenerator { return &randObjGenerator{objSize: 1000} }},
{desc: "overwrite10", create: func() objectGenerator { return &overwriteObjGenerator{objSize: 10, maxObjects: 100} }},
{desc: "overwrite100", create: func() objectGenerator { return &overwriteObjGenerator{objSize: 100, maxObjects: 100} }},
{desc: "overwrite1000", create: func() objectGenerator { return &overwriteObjGenerator{objSize: 1000, maxObjects: 100} }},
}
for _, genEntry := range generators {
for _, stEntry := range storages {
b.Run(fmt.Sprintf("%s-%s", stEntry.desc, genEntry.desc), func(b *testing.B) {
gen := genEntry.create()
st, cleanup := stEntry.create(b)
require.NoError(b, st.Open(false))
require.NoError(b, st.Init())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
obj := gen.Next()
addr := addressFromObject(obj)
raw, err := obj.Marshal()
require.NoError(b, err)
if _, err := st.Put(common.PutPrm{
Address: addr,
RawData: raw,
}); err != nil {
b.Fatalf("writing entry: %v", err)
}
}
})
require.NoError(b, st.Close())
cleanup()
})
}
}
}
func BenchmarkSubstorageIteratePerf(b *testing.B) {
iterateTests := []struct {
desc string
size int
objGen func() objectGenerator
}{
{
desc: "rand100",
size: 10000,
objGen: func() objectGenerator { return &randObjGenerator{objSize: 100} },
},
}
for _, tt := range iterateTests {
for _, stEntry := range storages {
b.Run(fmt.Sprintf("%s-%s", stEntry.desc, tt.desc), func(b *testing.B) {
objGen := tt.objGen()
st, cleanup := stEntry.create(b)
require.NoError(b, st.Open(false))
require.NoError(b, st.Init())
// Fill database
for i := 0; i < tt.size; i++ {
obj := objGen.Next()
addr := addressFromObject(obj)
raw, err := obj.Marshal()
require.NoError(b, err)
if _, err := st.Put(common.PutPrm{
Address: addr,
RawData: raw,
}); err != nil {
b.Fatalf("writing entry: %v", err)
}
}
// Benchmark iterate
cnt := 0
b.ResetTimer()
_, err := st.Iterate(common.IteratePrm{
Handler: func(elem common.IterationElement) error {
cnt++
return nil
},
})
require.NoError(b, err)
require.Equal(b, tt.size, cnt)
b.StopTimer()
require.NoError(b, st.Close())
cleanup()
})
}
}
}
func addressFromObject(obj *objectSDK.Object) oid.Address {
var addr oid.Address
if id, isSet := obj.ID(); isSet {
addr.SetObject(id)
} else {
panic("object ID is not set")
}
if cid, isSet := obj.ContainerID(); isSet {
addr.SetContainer(cid)
} else {
panic("container ID is not set")
}
return addr
}
// addressGenerator is the interface of types that generate object addresses.
type addressGenerator interface {
Next() oid.Address
}
// seqAddrGenerator is an addressGenerator that generates addresses sequentially and wraps around the given max ID.
type seqAddrGenerator struct {
cnt atomic.Uint64
maxID uint64
}
func (g *seqAddrGenerator) Next() oid.Address {
var id oid.ID
binary.LittleEndian.PutUint64(id[:], ((g.cnt.Inc()-1)%g.maxID)+1)
var addr oid.Address
addr.SetContainer(cid.ID{})
addr.SetObject(id)
return addr
}
func TestSeqAddrGenerator(t *testing.T) {
gen := &seqAddrGenerator{maxID: 10}
for i := 1; i <= 20; i++ {
addr := gen.Next()
id := addr.Object()
require.Equal(t, uint64((i-1)%int(gen.maxID)+1), binary.LittleEndian.Uint64(id[:]))
}
}
// randAddrGenerator is an addressGenerator that generates random addresses in the given range.
type randAddrGenerator uint64
func (g randAddrGenerator) Next() oid.Address {
var id oid.ID
binary.LittleEndian.PutUint64(id[:], uint64(1+int(rand.Int63n(int64(g)))))
var addr oid.Address
addr.SetContainer(cid.ID{})
addr.SetObject(id)
return addr
}
func TestRandAddrGenerator(t *testing.T) {
gen := randAddrGenerator(5)
for i := 0; i < 50; i++ {
addr := gen.Next()
id := addr.Object()
k := binary.LittleEndian.Uint64(id[:])
require.True(t, 1 <= k && k <= uint64(gen))
}
}
// objectGenerator is the interface of types that generate object entries.
type objectGenerator interface {
Next() *objectSDK.Object
}
// seqObjGenerator is an objectGenerator that generates entries with random payloads of size objSize and sequential IDs.
type seqObjGenerator struct {
cnt atomic.Uint64
objSize uint64
}
func (g *seqObjGenerator) Next() *objectSDK.Object {
var id oid.ID
binary.LittleEndian.PutUint64(id[:], g.cnt.Inc())
return genObject(id, cid.ID{}, g.objSize)
}
func TestSeqObjGenerator(t *testing.T) {
gen := &seqObjGenerator{objSize: 10}
var addrs []string
for i := 1; i <= 10; i++ {
obj := gen.Next()
id, isSet := obj.ID()
addrs = append(addrs, addressFromObject(obj).EncodeToString())
require.True(t, isSet)
require.Equal(t, gen.objSize, uint64(len(obj.Payload())))
require.Equal(t, uint64(i), binary.LittleEndian.Uint64(id[:]))
}
require.True(t, slices.IsSorted(addrs))
}
// randObjGenerator is an objectGenerator that generates entries with random IDs and payloads of size objSize.
type randObjGenerator struct {
objSize uint64
}
func (g *randObjGenerator) Next() *objectSDK.Object {
return genObject(oidtest.ID(), cidtest.ID(), g.objSize)
}
func TestRandObjGenerator(t *testing.T) {
gen := &randObjGenerator{objSize: 10}
for i := 0; i < 10; i++ {
obj := gen.Next()
require.Equal(t, gen.objSize, uint64(len(obj.Payload())))
}
}
// overwriteObjGenerator is an objectGenerator that generates entries with random payloads of size objSize and at most maxObjects distinct IDs.
type overwriteObjGenerator struct {
objSize uint64
maxObjects uint64
}
func (g *overwriteObjGenerator) Next() *objectSDK.Object {
var id oid.ID
binary.LittleEndian.PutUint64(id[:], uint64(1+rand.Int63n(int64(g.maxObjects))))
return genObject(id, cid.ID{}, g.objSize)
}
func TestOverwriteObjGenerator(t *testing.T) {
gen := &overwriteObjGenerator{
objSize: 10,
maxObjects: 4,
}
for i := 0; i < 40; i++ {
obj := gen.Next()
id, isSet := obj.ID()
i := binary.LittleEndian.Uint64(id[:])
require.True(t, isSet)
require.Equal(t, gen.objSize, uint64(len(obj.Payload())))
require.True(t, 1 <= i && i <= gen.maxObjects)
}
}
// Generates an object with random payload and the given address and size.
// TODO(#86): there's some testing-related dupes in many places. Probably worth
// spending some time cleaning up a bit.
func genObject(id oid.ID, cid cid.ID, sz uint64) *objectSDK.Object {
raw := objectSDK.New()
raw.SetID(id)
raw.SetContainerID(cid)
payload := make([]byte, sz)
rand.Read(payload)
raw.SetPayload(payload)
return raw
}