[#1223] scripts: Add script to populate metabase

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2024-07-15 14:07:32 +03:00
parent 98fe24cdb7
commit d1d2f1a2e5
3 changed files with 534 additions and 0 deletions

View file

@ -0,0 +1,132 @@
package internal
import (
"crypto/sha256"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"golang.org/x/exp/rand"
)
func GeneratePayloadPool(count uint, size uint) [][]byte {
pool := [][]byte{}
for i := uint(0); i < count; i++ {
payload := make([]byte, size)
_, _ = rand.Read(payload)
pool = append(pool, payload)
}
return pool
}
func GenerateAttributePool(count uint) []objectSDK.Attribute {
pool := []objectSDK.Attribute{}
for i := uint(0); i < count; i++ {
for j := uint(0); j < count; j++ {
attr := *objectSDK.NewAttribute()
attr.SetKey(fmt.Sprintf("key%d", i))
attr.SetValue(fmt.Sprintf("value%d", j))
pool = append(pool, attr)
}
}
return pool
}
func GenerateOwnerPool(count uint) []user.ID {
pool := []user.ID{}
for i := uint(0); i < count; i++ {
pool = append(pool, usertest.ID())
}
return pool
}
type ObjectOption func(obj *objectSDK.Object)
func GenerateObject(options ...ObjectOption) *objectSDK.Object {
var ver version.Version
ver.SetMajor(2)
ver.SetMinor(1)
payload := make([]byte, 0)
var csum checksum.Checksum
csum.SetSHA256(sha256.Sum256(payload))
var csumTZ checksum.Checksum
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
obj := objectSDK.New()
obj.SetID(oidtest.ID())
obj.SetOwnerID(usertest.ID())
obj.SetContainerID(cidtest.ID())
header := objecttest.Object().GetECHeader()
header.SetParent(oidtest.ID())
obj.SetECHeader(header)
obj.SetVersion(&ver)
obj.SetPayload(payload)
obj.SetPayloadSize(uint64(len(payload)))
obj.SetPayloadChecksum(csum)
obj.SetPayloadHomomorphicHash(csumTZ)
for _, option := range options {
option(obj)
}
return obj
}
func WithContainerID(cid cid.ID) ObjectOption {
return func(obj *objectSDK.Object) {
obj.SetContainerID(cid)
}
}
func WithType(typ objectSDK.Type) ObjectOption {
return func(obj *objectSDK.Object) {
obj.SetType(typ)
}
}
func WithPayloadFromPool(pool [][]byte) ObjectOption {
payload := pool[rand.Intn(len(pool))]
var csum checksum.Checksum
csum.SetSHA256(sha256.Sum256(payload))
var csumTZ checksum.Checksum
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
return func(obj *objectSDK.Object) {
obj.SetPayload(payload)
obj.SetPayloadSize(uint64(len(payload)))
obj.SetPayloadChecksum(csum)
obj.SetPayloadHomomorphicHash(csumTZ)
}
}
func WithAttributesFromPool(pool []objectSDK.Attribute, count uint) ObjectOption {
return func(obj *objectSDK.Object) {
attrs := []objectSDK.Attribute{}
for i := uint(0); i < count; i++ {
attrs = append(attrs, pool[rand.Intn(len(pool))])
}
obj.SetAttributes(attrs...)
}
}
func WithOwnerIDFromPool(pool []user.ID) ObjectOption {
return func(obj *objectSDK.Object) {
obj.SetOwnerID(pool[rand.Intn(len(pool))])
}
}

View file

@ -0,0 +1,252 @@
package internal
import (
"context"
"fmt"
"math/rand"
"sync"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"golang.org/x/sync/errgroup"
)
type EpochState struct{}
func (s EpochState) CurrentEpoch() uint64 {
return 0
}
func PopulateWithObjects(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
count uint,
factory func() *objectSDK.Object,
) {
digits := "0123456789"
for i := uint(0); i < count; i++ {
obj := factory()
id := []byte(fmt.Sprintf(
"%c/%c/%c",
digits[rand.Int()%len(digits)],
digits[rand.Int()%len(digits)],
digits[rand.Int()%len(digits)],
))
prm := meta.PutPrm{}
prm.SetObject(obj)
prm.SetStorageID(id)
group.Go(func() error {
_, err := db.Put(ctx, prm)
return err
})
}
}
func PopulateWithBigObjects(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
count uint,
factory func() *objectSDK.Object,
) {
for i := uint(0); i < count; i++ {
group.Go(func() error {
return populateWithBigObject(ctx, db, factory)
})
}
}
func populateWithBigObject(
ctx context.Context,
db *meta.DB,
factory func() *objectSDK.Object,
) error {
t := &target{db: db}
pk, _ := keys.NewPrivateKey()
p := transformer.NewPayloadSizeLimiter(transformer.Params{
Key: &pk.PrivateKey,
NextTargetInit: func() transformer.ObjectWriter { return t },
NetworkState: EpochState{},
MaxSize: 10,
})
obj := factory()
payload := make([]byte, 30)
err := p.WriteHeader(ctx, obj)
if err != nil {
return err
}
_, err = p.Write(ctx, payload)
if err != nil {
return err
}
_, err = p.Close(ctx)
if err != nil {
return err
}
return nil
}
type target struct {
db *meta.DB
}
func (t *target) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
prm := meta.PutPrm{}
prm.SetObject(obj)
_, err := t.db.Put(ctx, prm)
return err
}
func PopulateGraveyard(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
workBufferSize int,
count uint,
factory func() *objectSDK.Object,
) {
ts := factory()
ts.SetType(objectSDK.TypeTombstone)
prm := meta.PutPrm{}
prm.SetObject(ts)
group.Go(func() error {
_, err := db.Put(ctx, prm)
return err
})
cID, _ := ts.ContainerID()
oID, _ := ts.ID()
var tsAddr oid.Address
tsAddr.SetContainer(cID)
tsAddr.SetObject(oID)
addrs := make(chan oid.Address, workBufferSize)
go func() {
defer close(addrs)
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint(0); i < count; i++ {
obj := factory()
prm := meta.PutPrm{}
prm.SetObject(obj)
group.Go(func() error {
defer wg.Done()
_, err := db.Put(ctx, prm)
if err != nil {
return err
}
cID, _ := obj.ContainerID()
oID, _ := obj.ID()
var addr oid.Address
addr.SetContainer(cID)
addr.SetObject(oID)
addrs <- addr
return nil
})
}
wg.Wait()
}()
go func() {
for addr := range addrs {
prm := meta.InhumePrm{}
prm.SetAddresses(addr)
prm.SetTombstoneAddress(tsAddr)
group.Go(func() error {
_, err := db.Inhume(ctx, prm)
return err
})
}
}()
}
func PopulateLocked(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
workBufferSize int,
count uint,
factory func() *objectSDK.Object,
) {
locker := factory()
locker.SetType(objectSDK.TypeLock)
prm := meta.PutPrm{}
prm.SetObject(locker)
group.Go(func() error {
_, err := db.Put(ctx, prm)
return err
})
ids := make(chan oid.ID, workBufferSize)
go func() {
defer close(ids)
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint(0); i < count; i++ {
defer wg.Done()
obj := factory()
prm := meta.PutPrm{}
prm.SetObject(obj)
group.Go(func() error {
_, err := db.Put(ctx, prm)
if err != nil {
return err
}
id, _ := obj.ID()
ids <- id
return nil
})
}
wg.Wait()
}()
go func() {
for id := range ids {
lockerCID, _ := locker.ContainerID()
lockerOID, _ := locker.ID()
group.Go(func() error {
err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id})
return err
})
}
}()
}

View file

@ -0,0 +1,150 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/scripts/populate-metabase/internal"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"golang.org/x/sync/errgroup"
)
var (
path = flag.String("path", "", "Path to metabase")
force = flag.Bool("force", false, "Rewrite existing database")
jobs = flag.Uint("j", 10000, "Number of jobs to run")
numContainers = flag.Uint("containers", 0, "Number of containers to be created")
numObjects = flag.Uint("objects", 0, "Number of objects per container")
numAttributesPerObj = flag.Uint("attributes", 0, "Number of attributes per object")
// Need to populate indexes.
numOwners = flag.Uint("distinct-owners", 10, "Number of distinct owners to be used")
numPayloads = flag.Uint("distinct-payloads", 10, "Number of distinct payloads to be used")
numAttributes = flag.Uint("distinct-attributes", 10, "Number of distinct attributes to be used")
)
func main() {
flag.Parse()
exitIfTrue("must have payloads", *numPayloads == 0)
exitIfTrue("must have attributes", *numAttributes == 0)
exitIfTrue("must have owners", *numOwners == 0)
exitIfTrue("path to metabase not specified", *path == "")
exitIfTrue(
"object can't have more attributes than available",
*numAttributesPerObj > *numAttributes,
)
info, err := os.Stat(*path)
exitIfTrue("can't get path info", err != nil && !errors.Is(err, os.ErrNotExist))
// Path exits.
if err == nil {
exitIfTrue("path is a directory", info.IsDir())
exitIfTrue("can't rewrite existing file, use '-force' flag", !*force)
err = os.Remove(*path)
exitOnErr("can't remove existing file", err)
}
db := meta.New([]meta.Option{
meta.WithPath(*path),
meta.WithPermissions(0o600),
meta.WithEpochState(internal.EpochState{}),
}...)
exitOnErr("can't open the metabase", db.Open(context.Background(), mode.ReadWrite))
exitOnErr("can't init the metabase", db.Init())
defer func() { exitOnErr("can't close the metabase", db.Close()) }()
exitOnErr("failed to populate database", populate(db))
}
func populate(db *meta.DB) error {
payloads := internal.GeneratePayloadPool(*numPayloads, 32)
attributes := internal.GenerateAttributePool(*numAttributes)
owners := internal.GenerateOwnerPool(*numOwners)
types := []objectSDK.Type{
objectSDK.TypeRegular,
objectSDK.TypeLock,
objectSDK.TypeTombstone,
}
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(int(*jobs))
for i := uint(0); i < *numContainers; i++ {
cid := cidtest.ID()
for _, typ := range types {
internal.PopulateWithObjects(
ctx, db, g, *numObjects, func() *objectSDK.Object {
return internal.GenerateObject(
internal.WithContainerID(cid),
internal.WithType(typ),
internal.WithPayloadFromPool(payloads),
internal.WithAttributesFromPool(attributes, *numAttributesPerObj),
internal.WithOwnerIDFromPool(owners),
)
},
)
}
internal.PopulateWithBigObjects(
ctx, db, g, *numObjects, func() *objectSDK.Object {
return internal.GenerateObject(
internal.WithContainerID(cid),
internal.WithType(objectSDK.TypeRegular),
internal.WithAttributesFromPool(attributes, *numAttributesPerObj),
internal.WithOwnerIDFromPool(owners),
)
})
internal.PopulateGraveyard(
ctx, db, g, int(*jobs), *numObjects, func() *objectSDK.Object {
return internal.GenerateObject(
internal.WithContainerID(cid),
internal.WithType(objectSDK.TypeRegular),
internal.WithAttributesFromPool(attributes, *numAttributesPerObj),
internal.WithOwnerIDFromPool(owners),
)
},
)
internal.PopulateLocked(
ctx, db, g, int(*jobs), *numObjects, func() *objectSDK.Object {
return internal.GenerateObject(
internal.WithContainerID(cid),
internal.WithType(objectSDK.TypeRegular),
internal.WithAttributesFromPool(attributes, *numAttributesPerObj),
internal.WithOwnerIDFromPool(owners),
)
},
)
}
return g.Wait()
}
func exitIfTrue(msg string, cond bool) {
if cond {
fmt.Fprintln(os.Stderr, msg)
os.Exit(1)
}
}
func exitOnErr(msg string, err error) {
if err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("%s: %w", msg, err))
os.Exit(1)
}
}