Do not store all payload in memory #115
22 changed files with 260 additions and 83 deletions
|
@ -2,6 +2,23 @@ name: Tests and linters
|
|||
on: [pull_request]
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
name: Lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.21'
|
||||
cache: true
|
||||
|
||||
- name: golangci-lint
|
||||
uses: https://github.com/golangci/golangci-lint-action@v3
|
||||
with:
|
||||
version: latest
|
||||
|
||||
tests:
|
||||
name: Tests
|
||||
runs-on: ubuntu-latest
|
||||
|
|
|
@ -38,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
|
|||
return modules.Exports{Default: d}
|
||||
}
|
||||
|
||||
func (d *Datagen) Generator(size int, typ string) *Generator {
|
||||
g := NewGenerator(d.vu, size, strings.ToLower(typ))
|
||||
func (d *Datagen) Generator(size int, typ string, streaming bool) *Generator {
|
||||
g := NewGenerator(d.vu, size, strings.ToLower(typ), streaming)
|
||||
return &g
|
||||
}
|
||||
|
|
|
@ -2,12 +2,10 @@ package datagen
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/dop251/goja"
|
||||
"github.com/go-loremipsum/loremipsum"
|
||||
"go.k6.io/k6/js/modules"
|
||||
)
|
||||
|
@ -28,11 +26,9 @@ type (
|
|||
buf []byte
|
||||
typ string
|
||||
offset int
|
||||
}
|
||||
|
||||
GenPayloadResponse struct {
|
||||
Payload goja.ArrayBuffer
|
||||
Hash string
|
||||
streaming bool
|
||||
seed *atomic.Int64
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -45,7 +41,7 @@ var payloadTypes = []string{
|
|||
"",
|
||||
}
|
||||
|
||||
func NewGenerator(vu modules.VU, size int, typ string) Generator {
|
||||
func NewGenerator(vu modules.VU, size int, typ string, streaming bool) Generator {
|
||||
if size <= 0 {
|
||||
panic("size should be positive")
|
||||
}
|
||||
|
@ -60,17 +56,20 @@ func NewGenerator(vu modules.VU, size int, typ string) Generator {
|
|||
if !found {
|
||||
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
buf := make([]byte, size+TailSize)
|
||||
g := Generator{
|
||||
vu: vu,
|
||||
size: size,
|
||||
rand: r,
|
||||
buf: buf,
|
||||
typ: typ,
|
||||
}
|
||||
|
||||
if streaming {
|
||||
g.streaming = true
|
||||
g.seed = new(atomic.Int64)
|
||||
} else {
|
||||
g.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
g.buf = make([]byte, size+TailSize)
|
||||
g.fillBuffer()
|
||||
}
|
||||
return g
|
||||
}
|
||||
|
||||
|
@ -85,21 +84,17 @@ func (g *Generator) fillBuffer() {
|
|||
}
|
||||
g.buf = b.Bytes()
|
||||
default:
|
||||
rand.Read(g.buf) // Per docs, err is always nil here
|
||||
g.rand.Read(g.buf) // Per docs, err is always nil here
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
|
||||
data := g.nextSlice()
|
||||
|
||||
dataHash := ""
|
||||
if calcHash {
|
||||
hashBytes := sha256.Sum256(data)
|
||||
dataHash = hex.EncodeToString(hashBytes[:])
|
||||
func (g *Generator) GenPayload() Payload {
|
||||
if g.streaming {
|
||||
return NewStreamPayload(g.size, g.seed.Add(1), g.typ)
|
||||
}
|
||||
|
||||
payload := g.vu.Runtime().NewArrayBuffer(data)
|
||||
return GenPayloadResponse{Payload: payload, Hash: dataHash}
|
||||
data := g.nextSlice()
|
||||
return NewFixedPayload(data)
|
||||
}
|
||||
|
||||
func (g *Generator) nextSlice() []byte {
|
||||
|
|
|
@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
|
|||
|
||||
t.Run("fails on negative size", func(t *testing.T) {
|
||||
require.Panics(t, func() {
|
||||
_ = NewGenerator(vu, -1, "")
|
||||
_ = NewGenerator(vu, -1, "", false)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("fails on zero size", func(t *testing.T) {
|
||||
require.Panics(t, func() {
|
||||
_ = NewGenerator(vu, 0, "")
|
||||
_ = NewGenerator(vu, 0, "", false)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("creates slice of specified size", func(t *testing.T) {
|
||||
size := 10
|
||||
g := NewGenerator(vu, size, "")
|
||||
g := NewGenerator(vu, size, "", false)
|
||||
slice := g.nextSlice()
|
||||
require.Len(t, slice, size)
|
||||
})
|
||||
|
||||
t.Run("creates a different slice on each call", func(t *testing.T) {
|
||||
g := NewGenerator(vu, 1000, "")
|
||||
g := NewGenerator(vu, 1000, "", false)
|
||||
slice1 := g.nextSlice()
|
||||
slice2 := g.nextSlice()
|
||||
// Each slice should be unique (assuming that 1000 random bytes will never coincide
|
||||
|
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
|
||||
g := NewGenerator(vu, 1000, "")
|
||||
g := NewGenerator(vu, 1000, "", false)
|
||||
initialSlice := g.nextSlice()
|
||||
for i := 0; i < TailSize; i++ {
|
||||
g.nextSlice()
|
||||
|
|
121
internal/datagen/payload.go
Normal file
121
internal/datagen/payload.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package datagen
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"hash"
|
||||
"io"
|
||||
"math/rand"
|
||||
|
||||
"github.com/go-loremipsum/loremipsum"
|
||||
)
|
||||
|
||||
// Payload represents arbitrary data to be packed into S3 or native object.
|
||||
// Implementations could be thread-unsafe.
|
||||
type Payload interface {
|
||||
// Reader returns io.Reader instance to read the payload.
|
||||
// Must not be called twice.
|
||||
Reader() io.Reader
|
||||
// Bytes is a helper which reads all data from Reader() into slice.
|
||||
// The sole purpose of this method is to simplify HTTP scenario,
|
||||
// where all payload needs to be read and wrapped.
|
||||
Bytes() []byte
|
||||
// Size returns payload size, which is equal to the total amount of data
|
||||
// that could be read from the Reader().
|
||||
Size() int
|
||||
// Hash returns payload sha256 hash. Must be called after all data is read from the reader.
|
||||
Hash() string
|
||||
}
|
||||
|
||||
type bytesPayload struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (p *bytesPayload) Reader() io.Reader {
|
||||
return bytes.NewReader(p.data)
|
||||
}
|
||||
|
||||
func (p *bytesPayload) Size() int {
|
||||
return len(p.data)
|
||||
}
|
||||
|
||||
func (p *bytesPayload) Hash() string {
|
||||
h := sha256.Sum256(p.data[:])
|
||||
return hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
func (p *bytesPayload) Bytes() []byte {
|
||||
return p.data
|
||||
}
|
||||
|
||||
func NewFixedPayload(data []byte) Payload {
|
||||
return &bytesPayload{data: data}
|
||||
}
|
||||
|
||||
type randomPayload struct {
|
||||
r io.Reader
|
||||
s hash.Hash
|
||||
h string
|
||||
size int
|
||||
}
|
||||
|
||||
func NewStreamPayload(size int, seed int64, typ string) Payload {
|
||||
var rr io.Reader
|
||||
switch typ {
|
||||
case "text":
|
||||
rr = &textReader{li: loremipsum.NewWithSeed(seed)}
|
||||
default:
|
||||
rr = rand.New(rand.NewSource(seed))
|
||||
}
|
||||
|
||||
lr := io.LimitReader(rr, int64(size))
|
||||
// We need some buffering to write complete blocks in the TeeReader.
|
||||
// Streaming payload read is expected to be used for big objects, thus 4k seems like a good choice.
|
||||
br := bufio.NewReaderSize(lr, 4096)
|
||||
s := sha256.New()
|
||||
tr := io.TeeReader(br, s)
|
||||
return &randomPayload{
|
||||
r: tr,
|
||||
s: s,
|
||||
size: size,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *randomPayload) Reader() io.Reader {
|
||||
return p.r
|
||||
}
|
||||
|
||||
func (p *randomPayload) Size() int {
|
||||
return p.size
|
||||
}
|
||||
|
||||
func (p *randomPayload) Hash() string {
|
||||
if p.h == "" {
|
||||
p.h = hex.EncodeToString(p.s.Sum(nil))
|
||||
// Prevent possible misuse.
|
||||
p.r = nil
|
||||
p.s = nil
|
||||
}
|
||||
return p.h
|
||||
}
|
||||
|
||||
func (p *randomPayload) Bytes() []byte {
|
||||
data, err := io.ReadAll(p.r)
|
||||
if err != nil {
|
||||
// We use only 2 readers, either `bytes.Reader` or `rand.Reader`.
|
||||
// None of them returns errors, thus encountering an error is a fatal error.
|
||||
panic(err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
type textReader struct {
|
||||
li *loremipsum.LoremIpsum
|
||||
}
|
||||
|
||||
func (r *textReader) Read(p []byte) (n int, err error) {
|
||||
paragraph := r.li.Paragraph()
|
||||
return copy(p, paragraph), nil
|
||||
}
|
40
internal/datagen/payload_test.go
Normal file
40
internal/datagen/payload_test.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package datagen
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFixedPayload(t *testing.T) {
|
||||
const size = 123
|
||||
data := make([]byte, size)
|
||||
_, err := rand.Read(data)
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewFixedPayload(data)
|
||||
require.Equal(t, size, p.Size())
|
||||
|
||||
actual, err := io.ReadAll(p.Reader())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, actual)
|
||||
|
||||
h := sha256.Sum256(data)
|
||||
require.Equal(t, hex.EncodeToString(h[:]), p.Hash())
|
||||
}
|
||||
|
||||
func TestStreamingPayload(t *testing.T) {
|
||||
const size = 123
|
||||
|
||||
p := NewStreamPayload(size, 0, "")
|
||||
require.Equal(t, size, p.Size())
|
||||
|
||||
actual, err := io.ReadAll(p.Reader())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, size, len(actual))
|
||||
require.Equal(t, sha256.Size*2, len(p.Hash()))
|
||||
}
|
|
@ -248,7 +248,7 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]
|
|||
|
||||
var shOpts [][]shard.Option
|
||||
|
||||
engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
|
||||
err := engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
|
||||
opts := []shard.Option{
|
||||
shard.WithRefillMetabase(sc.RefillMetabase()),
|
||||
shard.WithMode(sc.Mode()),
|
||||
|
@ -375,7 +375,9 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]
|
|||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("iterate shards: %w", err)
|
||||
}
|
||||
return ngOpts, shOpts, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package native
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/sha256"
|
||||
|
@ -23,6 +22,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||
"github.com/dop251/goja"
|
||||
"go.k6.io/k6/js/modules"
|
||||
|
@ -77,7 +77,7 @@ type (
|
|||
|
||||
const defaultBufferSize = 64 * 1024
|
||||
|
||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
|
||||
func (c *Client) Put(containerID string, headers map[string]string, payload datagen.Payload, chunkSize int) PutResponse {
|
||||
cliContainerID := parseContainerID(containerID)
|
||||
|
||||
tok := c.tok
|
||||
|
@ -104,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
|
|||
o.SetOwnerID(&owner)
|
||||
o.SetAttributes(attrs...)
|
||||
|
||||
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
|
||||
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize)
|
||||
if err != nil {
|
||||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
@ -309,10 +309,9 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse {
|
|||
}
|
||||
|
||||
start := time.Now()
|
||||
var prm client.PrmContainerPut
|
||||
prm.SetContainer(cnr)
|
||||
|
||||
res, err := c.cli.ContainerPut(c.vu.Context(), prm)
|
||||
res, err := c.cli.ContainerPut(c.vu.Context(), client.PrmContainerPut{
|
||||
Container: &cnr,
|
||||
})
|
||||
if err != nil {
|
||||
return c.putCnrErrorResponse(err)
|
||||
}
|
||||
|
@ -399,7 +398,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
|||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
||||
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
|
||||
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0)
|
||||
if err != nil {
|
||||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
@ -414,15 +413,15 @@ func (s epochSource) CurrentEpoch() uint64 {
|
|||
}
|
||||
|
||||
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
||||
hdr *object.Object, payload []byte, chunkSize int,
|
||||
hdr *object.Object, payload datagen.Payload, chunkSize int,
|
||||
) (*client.ResObjectPut, error) {
|
||||
bufSize := defaultBufferSize
|
||||
if chunkSize > 0 {
|
||||
bufSize = chunkSize
|
||||
}
|
||||
buf := make([]byte, bufSize)
|
||||
rdr := bytes.NewReader(payload)
|
||||
sz := rdr.Size()
|
||||
rdr := payload.Reader()
|
||||
sz := payload.Size()
|
||||
|
||||
// starting upload
|
||||
start := time.Now()
|
||||
|
@ -500,10 +499,9 @@ func (x *waitParams) setDefaults() {
|
|||
|
||||
func (c *Client) waitForContainerPresence(ctx context.Context, cnrID cid.ID, wp *waitParams) error {
|
||||
return waitFor(ctx, wp, func(ctx context.Context) bool {
|
||||
var prm client.PrmContainerGet
|
||||
prm.SetContainer(cnrID)
|
||||
|
||||
_, err := c.cli.ContainerGet(ctx, prm)
|
||||
_, err := c.cli.ContainerGet(ctx, client.PrmContainerGet{
|
||||
ContainerID: &cnrID,
|
||||
})
|
||||
return err == nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -89,9 +89,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
|||
|
||||
// generate session token
|
||||
exp := uint64(math.MaxUint64)
|
||||
var prmSessionCreate client.PrmSessionCreate
|
||||
prmSessionCreate.SetExp(exp)
|
||||
sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate)
|
||||
sessionResp, err := cli.SessionCreate(n.vu.Context(), client.PrmSessionCreate{
|
||||
Expiration: exp,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
|
@ -9,12 +8,12 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/dop251/goja"
|
||||
"go.k6.io/k6/js/modules"
|
||||
"go.k6.io/k6/metrics"
|
||||
)
|
||||
|
@ -51,9 +50,9 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||
rdr := bytes.NewReader(payload.Bytes())
|
||||
sz := rdr.Size()
|
||||
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
|
||||
rdr := payload.Reader()
|
||||
sz := payload.Size()
|
||||
|
||||
start := time.Now()
|
||||
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
||||
|
@ -74,7 +73,7 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
|||
|
||||
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
|
||||
|
||||
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
|
||||
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse {
|
||||
if objPartSize < multipartUploadMinPartSize {
|
||||
stats.Report(c.vu, objPutFails, 1)
|
||||
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
|
||||
|
@ -86,8 +85,8 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay
|
|||
u.Concurrency = concurrency
|
||||
})
|
||||
|
||||
payloadReader := bytes.NewReader(payload.Bytes())
|
||||
sz := payloadReader.Len()
|
||||
payloadReader := payload.Reader()
|
||||
sz := payload.Size()
|
||||
|
||||
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
package s3local
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||
"github.com/dop251/goja"
|
||||
"go.k6.io/k6/js/modules"
|
||||
"go.k6.io/k6/metrics"
|
||||
)
|
||||
|
@ -35,7 +34,7 @@ type (
|
|||
GetResponse SuccessOrErrorResponse
|
||||
)
|
||||
|
||||
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
|
||||
if c.limiter.IsFull() {
|
||||
return PutResponse{
|
||||
Success: false,
|
||||
|
@ -58,8 +57,8 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
|||
},
|
||||
Header: map[string]string{},
|
||||
Object: key,
|
||||
Size: int64(len(payload.Bytes())),
|
||||
Reader: bytes.NewReader(payload.Bytes()),
|
||||
Size: int64(payload.Size()),
|
||||
Reader: payload.Reader(),
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
|
|
@ -151,7 +151,10 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s
|
|||
}
|
||||
|
||||
l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg)
|
||||
l.Initialize(s.l.VU().Context(), nopEventListener{})
|
||||
err = l.Initialize(s.l.VU().Context(), nopEventListener{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize: %w", err)
|
||||
}
|
||||
|
||||
return &Client{
|
||||
vu: s.l.VU(),
|
||||
|
|
|
@ -149,7 +149,7 @@ export function obj_write() {
|
|||
};
|
||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
||||
if (!resp.success) {
|
||||
log.withField("cid", container).error(resp.error);
|
||||
|
@ -157,7 +157,7 @@ export function obj_write() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
||||
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -174,7 +174,7 @@ export function obj_write() {
|
|||
};
|
||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
||||
if (!resp.success) {
|
||||
log.withField("cid", container).error(resp.error);
|
||||
|
@ -182,7 +182,7 @@ export function obj_write() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
||||
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -97,10 +97,12 @@ export function obj_write() {
|
|||
|
||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const data = {
|
||||
field: uuidv4(),
|
||||
file: http.file(payload, "random.data"),
|
||||
// Because we use `file` wrapping and it is not straightforward to use streams here,
|
||||
// `-e STREAMING=1` has no effect for this scenario.
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
file: http.file(payload.bytes(), "random.data"),
|
||||
};
|
||||
|
||||
const resp = http.post(`http://${http_endpoint}/upload/${container}`, data);
|
||||
|
@ -110,7 +112,7 @@ export function obj_write() {
|
|||
}
|
||||
const object_id = JSON.parse(resp.body).object_id;
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject(container, object_id, "", "", hash);
|
||||
obj_registry.addObject(container, object_id, "", "", payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ import datagen from 'k6/x/frostfs/datagen';
|
|||
|
||||
export function newGenerator(condition) {
|
||||
if (condition) {
|
||||
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
|
||||
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "", !!__ENV.STREAMING);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ export function obj_write() {
|
|||
};
|
||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = local_client.put(container, headers, payload);
|
||||
if (!resp.success) {
|
||||
if (resp.abort) {
|
||||
|
@ -139,7 +139,7 @@ export function obj_write() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
||||
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
|
|||
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
|
||||
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
|
||||
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
|
||||
* `STREAMING` - if set, the payload is generated on the fly and is not read into memory fully.
|
||||
|
||||
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
|
||||
```shell
|
||||
|
|
|
@ -145,7 +145,7 @@ export function obj_write() {
|
|||
const key = __ENV.OBJ_NAME || uuidv4();
|
||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = s3_client.put(bucket, key, payload);
|
||||
if (!resp.success) {
|
||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||
|
@ -153,7 +153,7 @@ export function obj_write() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject("", "", bucket, key, hash);
|
||||
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ export function obj_write() {
|
|||
const key = __ENV.OBJ_NAME || uuidv4();
|
||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = s3_client.put(bucket, key, payload);
|
||||
if (!resp.success) {
|
||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||
|
@ -180,7 +180,7 @@ export function obj_write() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject("", "", bucket, key, hash);
|
||||
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ export function obj_write_multipart() {
|
|||
const key = __ENV.OBJ_NAME || uuidv4();
|
||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||
|
||||
const {payload, hash} = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
|
||||
if (!resp.success) {
|
||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||
|
@ -100,6 +100,6 @@ export function obj_write_multipart() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject("", "", bucket, key, hash);
|
||||
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ export function obj_write() {
|
|||
const key = __ENV.OBJ_NAME || uuidv4();
|
||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const payload = generator.genPayload();
|
||||
const resp = s3_client.put(bucket, key, payload);
|
||||
if (!resp.success) {
|
||||
if (resp.abort) {
|
||||
|
@ -135,7 +135,7 @@ export function obj_write() {
|
|||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject("", "", bucket, key, hash);
|
||||
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue
This is so annoying, haven't yet come up with a solution, will fix in this PR, thus WIP
04a8119ef3/js/modules/k6/http/file.go (L25)
Decided to avoid streaming for http scenario.