package main import ( "bytes" "context" "crypto/ecdsa" "flag" "log/slog" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" ) var ( objectList []oid.ID mu sync.Mutex ) func main() { ctx := context.Background() endpoint := flag.String("r", "localhost:8080", "storage address") bucketPolicy := flag.String("p", "REP 1", "test bucket policy") walletWIF := flag.String("wif", "", "") numObjects := flag.Int("n", 1000, "number of objects to upload") numRoutines := flag.Int("t", 4, "concurrent goroutines") flag.Parse() // Initialize environment k := parseKey(*walletWIF) owner := deriveOwner(k) cli := initPoolClient(ctx, *endpoint, &k.PrivateKey) objectList = make([]oid.ID, 0, *numObjects) // Create test bucket cnrID := createBucket(ctx, cli, owner, *bucketPolicy) slog.Info("container has been created", slog.String("id", cnrID.String())) // Upload objects upload(ctx, cli, owner, *numObjects, *numRoutines, cnrID) slog.Info("objects has been uploaded", slog.Int("amount", *numObjects)) slog.Info("now start to tick epochs") // Remove objects remove(ctx, cli, cnrID, *numRoutines) } func parseKey(wif string) *keys.PrivateKey { k, err := keys.NewPrivateKeyFromWIF(wif) die(err) return k } func deriveOwner(k *keys.PrivateKey) (owner user.ID) { user.IDFromKey(&owner, k.PrivateKey.PublicKey) return } func initPoolClient(ctx context.Context, endpoint string, key *ecdsa.PrivateKey) *pool.Pool { prmInit := pool.InitParameters{} prmInit.SetKey(key) prmInit.AddNode(pool.NewNodeParam(1, endpoint, 1)) cli, err := pool.NewPool(prmInit) die(err) die(cli.Dial(ctx)) return cli } func createBucket(ctx context.Context, cli *pool.Pool, owner user.ID, policy string) cid.ID { pp := netmap.PlacementPolicy{} die(pp.DecodeString(policy)) cnr := container.Container{} cnr.Init() cnr.SetOwner(owner) cnr.SetPlacementPolicy(pp) prmContainerPut := pool.PrmContainerPut{ ClientParams: client.PrmContainerPut{ Container: &cnr, }, WaitParams: &pool.WaitParams{PollInterval: time.Second, Timeout: 10 * time.Second}, } createdContainerID, err := cli.PutContainer(ctx, prmContainerPut) die(err) return createdContainerID } func upload(ctx context.Context, cli *pool.Pool, owner user.ID, nObjs, nRoutines int, cnrID cid.ID) { // define worker function uploader := func(ctx context.Context, ch chan struct{}, wg *sync.WaitGroup, cli *pool.Pool) { defer wg.Done() for { select { case _, ok := <-ch: if !ok { return } case <-ctx.Done(): return } obj := object.New() obj.SetContainerID(cnrID) obj.SetOwnerID(owner) prm := pool.PrmObjectPut{} prm.SetPayload(bytes.NewBuffer(nil)) prm.SetHeader(*obj) objID, err := cli.PutObject(ctx, prm) die(err) mu.Lock() objectList = append(objectList, objID.ObjectID) mu.Unlock() } } // run workers work := make(chan struct{}) wg := sync.WaitGroup{} wg.Add(nRoutines) for i := 0; i < nRoutines; i++ { go uploader(ctx, work, &wg, cli) } for i := 0; i < nObjs; i++ { work <- struct{}{} } close(work) wg.Wait() } func remove(ctx context.Context, cli *pool.Pool, cnrID cid.ID, nRoutines int) { // define worker function deleter := func(ctx context.Context, ch chan oid.ID, wg *sync.WaitGroup, cli *pool.Pool) { defer wg.Done() var ( objID oid.ID ok bool ) for { select { case objID, ok = <-ch: if !ok { return } case <-ctx.Done(): return } addr := oid.Address{} addr.SetContainer(cnrID) addr.SetObject(objID) prm := pool.PrmObjectDelete{} prm.SetAddress(addr) err := cli.DeleteObject(ctx, prm) if err != nil { slog.Warn("delete object error", slog.String("id", objID.String()), slog.String("error", err.Error())) } } } work := make(chan oid.ID) wg := sync.WaitGroup{} wg.Add(nRoutines) for i := 0; i < nRoutines; i++ { go deleter(ctx, work, &wg, cli) } for _, objID := range objectList { work <- objID } close(work) wg.Wait() } func die(err error) { if err != nil { panic(err) } }