forked from TrueCloudLab/frostfs-node
264 lines
4.8 KiB
Go
264 lines
4.8 KiB
Go
|
package internal
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"math/rand"
|
||
|
"sync"
|
||
|
|
||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
)
|
||
|
|
||
|
type EpochState struct{}
|
||
|
|
||
|
func (s EpochState) CurrentEpoch() uint64 {
|
||
|
return 0
|
||
|
}
|
||
|
|
||
|
func PopulateWithObjects(
|
||
|
ctx context.Context,
|
||
|
db *meta.DB,
|
||
|
group *errgroup.Group,
|
||
|
count uint,
|
||
|
factory func() *objectSDK.Object,
|
||
|
) {
|
||
|
digits := "0123456789"
|
||
|
|
||
|
for i := uint(0); i < count; i++ {
|
||
|
obj := factory()
|
||
|
|
||
|
id := []byte(fmt.Sprintf(
|
||
|
"%c/%c/%c",
|
||
|
digits[rand.Int()%len(digits)],
|
||
|
digits[rand.Int()%len(digits)],
|
||
|
digits[rand.Int()%len(digits)],
|
||
|
))
|
||
|
|
||
|
prm := meta.PutPrm{}
|
||
|
prm.SetObject(obj)
|
||
|
prm.SetStorageID(id)
|
||
|
|
||
|
group.Go(func() error {
|
||
|
if _, err := db.Put(ctx, prm); err != nil {
|
||
|
return fmt.Errorf("couldn't put an object: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func PopulateWithBigObjects(
|
||
|
ctx context.Context,
|
||
|
db *meta.DB,
|
||
|
group *errgroup.Group,
|
||
|
count uint,
|
||
|
factory func() *objectSDK.Object,
|
||
|
) {
|
||
|
for i := uint(0); i < count; i++ {
|
||
|
group.Go(func() error {
|
||
|
if err := populateWithBigObject(ctx, db, factory); err != nil {
|
||
|
return fmt.Errorf("couldn't put a big object: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func populateWithBigObject(
|
||
|
ctx context.Context,
|
||
|
db *meta.DB,
|
||
|
factory func() *objectSDK.Object,
|
||
|
) error {
|
||
|
t := &target{db: db}
|
||
|
|
||
|
pk, _ := keys.NewPrivateKey()
|
||
|
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||
|
Key: &pk.PrivateKey,
|
||
|
NextTargetInit: func() transformer.ObjectWriter { return t },
|
||
|
NetworkState: EpochState{},
|
||
|
MaxSize: 10,
|
||
|
})
|
||
|
|
||
|
obj := factory()
|
||
|
payload := make([]byte, 30)
|
||
|
|
||
|
err := p.WriteHeader(ctx, obj)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
_, err = p.Write(ctx, payload)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
_, err = p.Close(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type target struct {
|
||
|
db *meta.DB
|
||
|
}
|
||
|
|
||
|
func (t *target) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||
|
prm := meta.PutPrm{}
|
||
|
prm.SetObject(obj)
|
||
|
|
||
|
_, err := t.db.Put(ctx, prm)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func PopulateGraveyard(
|
||
|
ctx context.Context,
|
||
|
db *meta.DB,
|
||
|
group *errgroup.Group,
|
||
|
workBufferSize int,
|
||
|
count uint,
|
||
|
factory func() *objectSDK.Object,
|
||
|
) {
|
||
|
ts := factory()
|
||
|
ts.SetType(objectSDK.TypeTombstone)
|
||
|
|
||
|
prm := meta.PutPrm{}
|
||
|
prm.SetObject(ts)
|
||
|
|
||
|
group.Go(func() error {
|
||
|
if _, err := db.Put(ctx, prm); err != nil {
|
||
|
return fmt.Errorf("couldn't put a tombstone object: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
|
||
|
cID, _ := ts.ContainerID()
|
||
|
oID, _ := ts.ID()
|
||
|
|
||
|
var tsAddr oid.Address
|
||
|
|
||
|
tsAddr.SetContainer(cID)
|
||
|
tsAddr.SetObject(oID)
|
||
|
|
||
|
addrs := make(chan oid.Address, workBufferSize)
|
||
|
|
||
|
go func() {
|
||
|
defer close(addrs)
|
||
|
|
||
|
wg := &sync.WaitGroup{}
|
||
|
wg.Add(int(count))
|
||
|
|
||
|
for i := uint(0); i < count; i++ {
|
||
|
obj := factory()
|
||
|
|
||
|
prm := meta.PutPrm{}
|
||
|
prm.SetObject(obj)
|
||
|
|
||
|
group.Go(func() error {
|
||
|
defer wg.Done()
|
||
|
|
||
|
if _, err := db.Put(ctx, prm); err != nil {
|
||
|
return fmt.Errorf("couldn't put an object: %w", err)
|
||
|
}
|
||
|
|
||
|
cID, _ := obj.ContainerID()
|
||
|
oID, _ := obj.ID()
|
||
|
|
||
|
var addr oid.Address
|
||
|
addr.SetContainer(cID)
|
||
|
addr.SetObject(oID)
|
||
|
|
||
|
addrs <- addr
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
wg.Wait()
|
||
|
}()
|
||
|
|
||
|
go func() {
|
||
|
for addr := range addrs {
|
||
|
prm := meta.InhumePrm{}
|
||
|
prm.SetAddresses(addr)
|
||
|
prm.SetTombstoneAddress(tsAddr)
|
||
|
|
||
|
group.Go(func() error {
|
||
|
if _, err := db.Inhume(ctx, prm); err != nil {
|
||
|
return fmt.Errorf("couldn't inhume an object: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func PopulateLocked(
|
||
|
ctx context.Context,
|
||
|
db *meta.DB,
|
||
|
group *errgroup.Group,
|
||
|
workBufferSize int,
|
||
|
count uint,
|
||
|
factory func() *objectSDK.Object,
|
||
|
) {
|
||
|
locker := factory()
|
||
|
locker.SetType(objectSDK.TypeLock)
|
||
|
|
||
|
prm := meta.PutPrm{}
|
||
|
prm.SetObject(locker)
|
||
|
|
||
|
group.Go(func() error {
|
||
|
if _, err := db.Put(ctx, prm); err != nil {
|
||
|
return fmt.Errorf("couldn't put a locker object: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
|
||
|
ids := make(chan oid.ID, workBufferSize)
|
||
|
|
||
|
go func() {
|
||
|
defer close(ids)
|
||
|
|
||
|
wg := &sync.WaitGroup{}
|
||
|
wg.Add(int(count))
|
||
|
|
||
|
for i := uint(0); i < count; i++ {
|
||
|
defer wg.Done()
|
||
|
|
||
|
obj := factory()
|
||
|
|
||
|
prm := meta.PutPrm{}
|
||
|
prm.SetObject(obj)
|
||
|
|
||
|
group.Go(func() error {
|
||
|
if _, err := db.Put(ctx, prm); err != nil {
|
||
|
return fmt.Errorf("couldn't put an object: %w", err)
|
||
|
}
|
||
|
|
||
|
id, _ := obj.ID()
|
||
|
ids <- id
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
wg.Wait()
|
||
|
}()
|
||
|
|
||
|
go func() {
|
||
|
for id := range ids {
|
||
|
lockerCID, _ := locker.ContainerID()
|
||
|
lockerOID, _ := locker.ID()
|
||
|
|
||
|
group.Go(func() error {
|
||
|
if err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id}); err != nil {
|
||
|
return fmt.Errorf("couldn't lock an object: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
}()
|
||
|
}
|