frostfs-node/scripts/populate-metabase/internal/populate.go

264 lines
4.8 KiB
Go
Raw Permalink Normal View History

package internal
import (
"context"
"fmt"
"math/rand"
"sync"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"golang.org/x/sync/errgroup"
)
type EpochState struct{}
func (s EpochState) CurrentEpoch() uint64 {
return 0
}
func PopulateWithObjects(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
count uint,
factory func() *objectSDK.Object,
) {
digits := "0123456789"
for i := uint(0); i < count; i++ {
obj := factory()
id := []byte(fmt.Sprintf(
"%c/%c/%c",
digits[rand.Int()%len(digits)],
digits[rand.Int()%len(digits)],
digits[rand.Int()%len(digits)],
))
prm := meta.PutPrm{}
prm.SetObject(obj)
prm.SetStorageID(id)
group.Go(func() error {
if _, err := db.Put(ctx, prm); err != nil {
return fmt.Errorf("couldn't put an object: %w", err)
}
return nil
})
}
}
func PopulateWithBigObjects(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
count uint,
factory func() *objectSDK.Object,
) {
for i := uint(0); i < count; i++ {
group.Go(func() error {
if err := populateWithBigObject(ctx, db, factory); err != nil {
return fmt.Errorf("couldn't put a big object: %w", err)
}
return nil
})
}
}
func populateWithBigObject(
ctx context.Context,
db *meta.DB,
factory func() *objectSDK.Object,
) error {
t := &target{db: db}
pk, _ := keys.NewPrivateKey()
p := transformer.NewPayloadSizeLimiter(transformer.Params{
Key: &pk.PrivateKey,
NextTargetInit: func() transformer.ObjectWriter { return t },
NetworkState: EpochState{},
MaxSize: 10,
})
obj := factory()
payload := make([]byte, 30)
err := p.WriteHeader(ctx, obj)
if err != nil {
return err
}
_, err = p.Write(ctx, payload)
if err != nil {
return err
}
_, err = p.Close(ctx)
if err != nil {
return err
}
return nil
}
type target struct {
db *meta.DB
}
func (t *target) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
prm := meta.PutPrm{}
prm.SetObject(obj)
_, err := t.db.Put(ctx, prm)
return err
}
func PopulateGraveyard(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
workBufferSize int,
count uint,
factory func() *objectSDK.Object,
) {
ts := factory()
ts.SetType(objectSDK.TypeTombstone)
prm := meta.PutPrm{}
prm.SetObject(ts)
group.Go(func() error {
if _, err := db.Put(ctx, prm); err != nil {
return fmt.Errorf("couldn't put a tombstone object: %w", err)
}
return nil
})
cID, _ := ts.ContainerID()
oID, _ := ts.ID()
var tsAddr oid.Address
tsAddr.SetContainer(cID)
tsAddr.SetObject(oID)
addrs := make(chan oid.Address, workBufferSize)
go func() {
defer close(addrs)
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint(0); i < count; i++ {
obj := factory()
prm := meta.PutPrm{}
prm.SetObject(obj)
group.Go(func() error {
defer wg.Done()
if _, err := db.Put(ctx, prm); err != nil {
return fmt.Errorf("couldn't put an object: %w", err)
}
cID, _ := obj.ContainerID()
oID, _ := obj.ID()
var addr oid.Address
addr.SetContainer(cID)
addr.SetObject(oID)
addrs <- addr
return nil
})
}
wg.Wait()
}()
go func() {
for addr := range addrs {
prm := meta.InhumePrm{}
prm.SetAddresses(addr)
prm.SetTombstoneAddress(tsAddr)
group.Go(func() error {
if _, err := db.Inhume(ctx, prm); err != nil {
return fmt.Errorf("couldn't inhume an object: %w", err)
}
return nil
})
}
}()
}
func PopulateLocked(
ctx context.Context,
db *meta.DB,
group *errgroup.Group,
workBufferSize int,
count uint,
factory func() *objectSDK.Object,
) {
locker := factory()
locker.SetType(objectSDK.TypeLock)
prm := meta.PutPrm{}
prm.SetObject(locker)
group.Go(func() error {
if _, err := db.Put(ctx, prm); err != nil {
return fmt.Errorf("couldn't put a locker object: %w", err)
}
return nil
})
ids := make(chan oid.ID, workBufferSize)
go func() {
defer close(ids)
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint(0); i < count; i++ {
defer wg.Done()
obj := factory()
prm := meta.PutPrm{}
prm.SetObject(obj)
group.Go(func() error {
if _, err := db.Put(ctx, prm); err != nil {
return fmt.Errorf("couldn't put an object: %w", err)
}
id, _ := obj.ID()
ids <- id
return nil
})
}
wg.Wait()
}()
go func() {
for id := range ids {
lockerCID, _ := locker.ContainerID()
lockerOID, _ := locker.ID()
group.Go(func() error {
if err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id}); err != nil {
return fmt.Errorf("couldn't lock an object: %w", err)
}
return nil
})
}
}()
}