Do not store all payload in memory #115

Merged
fyrchik merged 5 commits from fyrchik/xk6-frostfs:streaming_payload into master 2024-09-04 19:51:18 +00:00
22 changed files with 260 additions and 83 deletions

View file

@ -2,6 +2,23 @@ name: Tests and linters
on: [pull_request]
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
cache: true
- name: golangci-lint
uses: https://github.com/golangci/golangci-lint-action@v3
with:
version: latest
tests:
name: Tests
runs-on: ubuntu-latest

View file

@ -38,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
return modules.Exports{Default: d}
}
func (d *Datagen) Generator(size int, typ string) *Generator {
g := NewGenerator(d.vu, size, strings.ToLower(typ))
func (d *Datagen) Generator(size int, typ string, streaming bool) *Generator {
g := NewGenerator(d.vu, size, strings.ToLower(typ), streaming)
return &g
}

View file

@ -2,12 +2,10 @@ package datagen
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"math/rand"
"sync/atomic"
"time"
"github.com/dop251/goja"
"github.com/go-loremipsum/loremipsum"
"go.k6.io/k6/js/modules"
)
@ -28,11 +26,9 @@ type (
buf []byte
typ string
offset int
}
GenPayloadResponse struct {
Payload goja.ArrayBuffer
Hash string
streaming bool
seed *atomic.Int64
}
)
@ -45,7 +41,7 @@ var payloadTypes = []string{
"",
}
func NewGenerator(vu modules.VU, size int, typ string) Generator {
func NewGenerator(vu modules.VU, size int, typ string, streaming bool) Generator {
if size <= 0 {
panic("size should be positive")
}
@ -60,17 +56,20 @@ func NewGenerator(vu modules.VU, size int, typ string) Generator {
if !found {
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
buf := make([]byte, size+TailSize)
g := Generator{
vu: vu,
size: size,
rand: r,
buf: buf,
typ: typ,
}
if streaming {
g.streaming = true
g.seed = new(atomic.Int64)
} else {
g.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
g.buf = make([]byte, size+TailSize)
g.fillBuffer()
}
return g
}
@ -85,21 +84,17 @@ func (g *Generator) fillBuffer() {
}
g.buf = b.Bytes()
default:
rand.Read(g.buf) // Per docs, err is always nil here
g.rand.Read(g.buf) // Per docs, err is always nil here
}
}
func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
func (g *Generator) GenPayload() Payload {
if g.streaming {
return NewStreamPayload(g.size, g.seed.Add(1), g.typ)
}
data := g.nextSlice()
dataHash := ""
if calcHash {
hashBytes := sha256.Sum256(data)
dataHash = hex.EncodeToString(hashBytes[:])
}
payload := g.vu.Runtime().NewArrayBuffer(data)
return GenPayloadResponse{Payload: payload, Hash: dataHash}
return NewFixedPayload(data)
}
func (g *Generator) nextSlice() []byte {

View file

@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
t.Run("fails on negative size", func(t *testing.T) {
require.Panics(t, func() {
_ = NewGenerator(vu, -1, "")
_ = NewGenerator(vu, -1, "", false)
})
})
t.Run("fails on zero size", func(t *testing.T) {
require.Panics(t, func() {
_ = NewGenerator(vu, 0, "")
_ = NewGenerator(vu, 0, "", false)
})
})
t.Run("creates slice of specified size", func(t *testing.T) {
size := 10
g := NewGenerator(vu, size, "")
g := NewGenerator(vu, size, "", false)
slice := g.nextSlice()
require.Len(t, slice, size)
})
t.Run("creates a different slice on each call", func(t *testing.T) {
g := NewGenerator(vu, 1000, "")
g := NewGenerator(vu, 1000, "", false)
slice1 := g.nextSlice()
slice2 := g.nextSlice()
// Each slice should be unique (assuming that 1000 random bytes will never coincide
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
})
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
g := NewGenerator(vu, 1000, "")
g := NewGenerator(vu, 1000, "", false)
initialSlice := g.nextSlice()
for i := 0; i < TailSize; i++ {
g.nextSlice()

121
internal/datagen/payload.go Normal file
View file

@ -0,0 +1,121 @@
package datagen
import (
"bufio"
"bytes"
"crypto/sha256"
"encoding/hex"
"hash"
"io"
"math/rand"
"github.com/go-loremipsum/loremipsum"
)
// Payload represents arbitrary data to be packed into S3 or native object.
// Implementations could be thread-unsafe.
type Payload interface {
// Reader returns io.Reader instance to read the payload.
// Must not be called twice.
Reader() io.Reader
// Bytes is a helper which reads all data from Reader() into slice.
// The sole purpose of this method is to simplify HTTP scenario,
// where all payload needs to be read and wrapped.
Bytes() []byte
// Size returns payload size, which is equal to the total amount of data
// that could be read from the Reader().
Size() int
// Hash returns payload sha256 hash. Must be called after all data is read from the reader.
Hash() string
}
type bytesPayload struct {
data []byte
}
func (p *bytesPayload) Reader() io.Reader {
return bytes.NewReader(p.data)
}
func (p *bytesPayload) Size() int {
return len(p.data)
}
func (p *bytesPayload) Hash() string {
h := sha256.Sum256(p.data[:])
return hex.EncodeToString(h[:])
}
func (p *bytesPayload) Bytes() []byte {
return p.data
}
func NewFixedPayload(data []byte) Payload {
return &bytesPayload{data: data}
}
type randomPayload struct {
r io.Reader
s hash.Hash
h string
size int
}
func NewStreamPayload(size int, seed int64, typ string) Payload {
var rr io.Reader
switch typ {
case "text":
rr = &textReader{li: loremipsum.NewWithSeed(seed)}
default:
rr = rand.New(rand.NewSource(seed))
}
lr := io.LimitReader(rr, int64(size))
// We need some buffering to write complete blocks in the TeeReader.
// Streaming payload read is expected to be used for big objects, thus 4k seems like a good choice.
br := bufio.NewReaderSize(lr, 4096)
s := sha256.New()
tr := io.TeeReader(br, s)
return &randomPayload{
r: tr,
s: s,
size: size,
}
}
func (p *randomPayload) Reader() io.Reader {
return p.r
}
func (p *randomPayload) Size() int {
return p.size
}
func (p *randomPayload) Hash() string {
if p.h == "" {
p.h = hex.EncodeToString(p.s.Sum(nil))
// Prevent possible misuse.
p.r = nil
p.s = nil
}
return p.h
}
func (p *randomPayload) Bytes() []byte {
data, err := io.ReadAll(p.r)
if err != nil {
// We use only 2 readers, either `bytes.Reader` or `rand.Reader`.
// None of them returns errors, thus encountering an error is a fatal error.
panic(err)
}
return data
}
type textReader struct {
li *loremipsum.LoremIpsum
}
func (r *textReader) Read(p []byte) (n int, err error) {
paragraph := r.li.Paragraph()
return copy(p, paragraph), nil
}

View file

@ -0,0 +1,40 @@
package datagen
import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"io"
"testing"
"github.com/stretchr/testify/require"
)
func TestFixedPayload(t *testing.T) {
const size = 123
data := make([]byte, size)
_, err := rand.Read(data)
require.NoError(t, err)
p := NewFixedPayload(data)
require.Equal(t, size, p.Size())
actual, err := io.ReadAll(p.Reader())
require.NoError(t, err)
require.Equal(t, data, actual)
h := sha256.Sum256(data)
require.Equal(t, hex.EncodeToString(h[:]), p.Hash())
}
func TestStreamingPayload(t *testing.T) {
const size = 123
p := NewStreamPayload(size, 0, "")
require.Equal(t, size, p.Size())
actual, err := io.ReadAll(p.Reader())
require.NoError(t, err)
require.Equal(t, size, len(actual))
require.Equal(t, sha256.Size*2, len(p.Hash()))
}

View file

@ -248,7 +248,7 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]
var shOpts [][]shard.Option
engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
err := engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
opts := []shard.Option{
shard.WithRefillMetabase(sc.RefillMetabase()),
shard.WithMode(sc.Mode()),
@ -375,7 +375,9 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]
return nil
})
if err != nil {
return nil, nil, fmt.Errorf("iterate shards: %w", err)
}
return ngOpts, shOpts, nil
}

View file

@ -1,7 +1,6 @@
package native
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
@ -23,6 +22,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
@ -77,7 +77,7 @@ type (
const defaultBufferSize = 64 * 1024
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
func (c *Client) Put(containerID string, headers map[string]string, payload datagen.Payload, chunkSize int) PutResponse {
cliContainerID := parseContainerID(containerID)
tok := c.tok
@ -104,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
o.SetOwnerID(&owner)
o.SetAttributes(attrs...)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -309,10 +309,9 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse {
}
start := time.Now()
var prm client.PrmContainerPut
prm.SetContainer(cnr)
res, err := c.cli.ContainerPut(c.vu.Context(), prm)
res, err := c.cli.ContainerPut(c.vu.Context(), client.PrmContainerPut{
Container: &cnr,
})
if err != nil {
return c.putCnrErrorResponse(err)
}
@ -399,7 +398,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()}
}
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -414,15 +413,15 @@ func (s epochSource) CurrentEpoch() uint64 {
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload []byte, chunkSize int,
hdr *object.Object, payload datagen.Payload, chunkSize int,
) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
bufSize = chunkSize
}
buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload)
sz := rdr.Size()
rdr := payload.Reader()
sz := payload.Size()
// starting upload
start := time.Now()
@ -500,10 +499,9 @@ func (x *waitParams) setDefaults() {
func (c *Client) waitForContainerPresence(ctx context.Context, cnrID cid.ID, wp *waitParams) error {
return waitFor(ctx, wp, func(ctx context.Context) bool {
var prm client.PrmContainerGet
prm.SetContainer(cnrID)
_, err := c.cli.ContainerGet(ctx, prm)
_, err := c.cli.ContainerGet(ctx, client.PrmContainerGet{
ContainerID: &cnrID,
})
return err == nil
})
}

View file

@ -89,9 +89,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
// generate session token
exp := uint64(math.MaxUint64)
var prmSessionCreate client.PrmSessionCreate
prmSessionCreate.SetExp(exp)
sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate)
sessionResp, err := cli.SessionCreate(n.vu.Context(), client.PrmSessionCreate{
Expiration: exp,
})
if err != nil {
return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err)
}

View file

@ -1,7 +1,6 @@
package s3
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
@ -9,12 +8,12 @@ import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
@ -51,9 +50,9 @@ type (
}
)
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
rdr := bytes.NewReader(payload.Bytes())
sz := rdr.Size()
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
rdr := payload.Reader()
sz := payload.Size()
start := time.Now()
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
@ -74,7 +73,7 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse {
if objPartSize < multipartUploadMinPartSize {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
@ -86,8 +85,8 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay
u.Concurrency = concurrency
})
payloadReader := bytes.NewReader(payload.Bytes())
sz := payloadReader.Len()
payloadReader := payload.Reader()
sz := payload.Size()
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
Bucket: aws.String(bucket),

View file

@ -1,15 +1,14 @@
package s3local
import (
"bytes"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
@ -35,7 +34,7 @@ type (
GetResponse SuccessOrErrorResponse
)
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
if c.limiter.IsFull() {
return PutResponse{
Success: false,
@ -58,8 +57,8 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
},
Header: map[string]string{},
Object: key,
Size: int64(len(payload.Bytes())),
Reader: bytes.NewReader(payload.Bytes()),
Size: int64(payload.Size()),
Reader: payload.Reader(),
}
start := time.Now()

View file

@ -151,7 +151,10 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s
}
l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg)
l.Initialize(s.l.VU().Context(), nopEventListener{})
err = l.Initialize(s.l.VU().Context(), nopEventListener{})
if err != nil {
return nil, fmt.Errorf("initialize: %w", err)
}
return &Client{
vu: s.l.VU(),

View file

@ -149,7 +149,7 @@ export function obj_write() {
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
@ -157,7 +157,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
}
}

View file

@ -174,7 +174,7 @@ export function obj_write() {
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
@ -182,7 +182,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
}
}

View file

@ -97,10 +97,12 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const data = {
field: uuidv4(),
file: http.file(payload, "random.data"),
// Because we use `file` wrapping and it is not straightforward to use streams here,
// `-e STREAMING=1` has no effect for this scenario.
fyrchik marked this conversation as resolved Outdated

This is so annoying, haven't yet come up with a solution, will fix in this PR, thus WIP

This is so annoying, haven't yet come up with a solution, will fix in this PR, thus WIP
https://github.com/grafana/k6/blob/04a8119ef33b6b12c0210bb05c80e8da9bb1a823/js/modules/k6/http/file.go#L25

Decided to avoid streaming for http scenario.

Decided to avoid streaming for http scenario.
file: http.file(payload.bytes(), "random.data"),
};
const resp = http.post(`http://${http_endpoint}/upload/${container}`, data);
@ -110,7 +112,7 @@ export function obj_write() {
}
const object_id = JSON.parse(resp.body).object_id;
if (obj_registry) {
obj_registry.addObject(container, object_id, "", "", hash);
obj_registry.addObject(container, object_id, "", "", payload.hash());
}
}

View file

@ -2,7 +2,7 @@ import datagen from 'k6/x/frostfs/datagen';
export function newGenerator(condition) {
if (condition) {
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "", !!__ENV.STREAMING);
}
return undefined;
}

View file

@ -128,7 +128,7 @@ export function obj_write() {
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = local_client.put(container, headers, payload);
if (!resp.success) {
if (resp.abort) {
@ -139,7 +139,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
}
}

View file

@ -19,6 +19,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
* `STREAMING` - if set, the payload is generated on the fly and is not read into memory fully.
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
```shell

View file

@ -145,7 +145,7 @@ export function obj_write() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
@ -153,7 +153,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}

View file

@ -172,7 +172,7 @@ export function obj_write() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
@ -180,7 +180,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}

View file

@ -92,7 +92,7 @@ export function obj_write_multipart() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const {payload, hash} = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
@ -100,6 +100,6 @@ export function obj_write_multipart() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}

View file

@ -124,7 +124,7 @@ export function obj_write() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload();
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
if (resp.abort) {
@ -135,7 +135,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}