Do not store all payload in memory #115

Merged
fyrchik merged 5 commits from fyrchik/xk6-frostfs:streaming_payload into master 2024-09-04 19:51:18 +00:00
14 changed files with 117 additions and 55 deletions
Showing only changes of commit 74121bb387 - Show all commits

View file

@ -2,12 +2,9 @@ package datagen
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"math/rand"
"time"
"github.com/dop251/goja"
"github.com/go-loremipsum/loremipsum"
"go.k6.io/k6/js/modules"
)
@ -29,11 +26,6 @@ type (
typ string
offset int
}
GenPayloadResponse struct {
Payload goja.ArrayBuffer
Hash string
}
)
// TailSize specifies number of extra random bytes in the buffer tail.
@ -89,17 +81,9 @@ func (g *Generator) fillBuffer() {
}
}
func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
func (g *Generator) GenPayload(calcHash bool) Payload {
data := g.nextSlice()
dataHash := ""
if calcHash {
hashBytes := sha256.Sum256(data)
dataHash = hex.EncodeToString(hashBytes[:])
}
payload := g.vu.Runtime().NewArrayBuffer(data)
return GenPayloadResponse{Payload: payload, Hash: dataHash}
return NewFixedPayload(data)
}
func (g *Generator) nextSlice() []byte {

View file

@ -0,0 +1,50 @@
package datagen
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"io"
)
// Payload represents arbitrary data to be packed into S3 or native object.
// Implementations could be thread-unsafe.
type Payload interface {
// Reader returns io.Reader instance to read the payload.
// Must not be called twice.
Reader() io.Reader
// Bytes is a helper which reads all data from Reader() into slice.
// The sole purpose of this method is to simplify HTTP scenario,
// where all payload needs to be read and wrapped.
Bytes() []byte
// Size returns payload size, which is equal to the total amount of data
// that could be read from the Reader().
Size() int
// Hash returns payload sha256 hash. Must be called after all data is read from the reader.
Hash() string
}
type bytesPayload struct {
data []byte
}
func (p *bytesPayload) Reader() io.Reader {
return bytes.NewReader(p.data)
}
func (p *bytesPayload) Size() int {
return len(p.data)
}
func (p *bytesPayload) Hash() string {
h := sha256.Sum256(p.data[:])
return hex.EncodeToString(h[:])
}
func (p *bytesPayload) Bytes() []byte {
return p.data
}
func NewFixedPayload(data []byte) Payload {
return &bytesPayload{data: data}
}

View file

@ -0,0 +1,28 @@
package datagen
import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"io"
"testing"
"github.com/stretchr/testify/require"
)
func TestFixedPayload(t *testing.T) {
const size = 123
data := make([]byte, size)
_, err := rand.Read(data)
require.NoError(t, err)
p := NewFixedPayload(data)
require.Equal(t, size, p.Size())
actual, err := io.ReadAll(p.Reader())
require.NoError(t, err)
require.Equal(t, data, actual)
h := sha256.Sum256(data)
require.Equal(t, hex.EncodeToString(h[:]), p.Hash())
}

View file

@ -1,7 +1,6 @@
package native
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
@ -23,6 +22,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
@ -77,7 +77,7 @@ type (
const defaultBufferSize = 64 * 1024
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
func (c *Client) Put(containerID string, headers map[string]string, payload datagen.Payload, chunkSize int) PutResponse {
cliContainerID := parseContainerID(containerID)
tok := c.tok
@ -104,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
o.SetOwnerID(&owner)
o.SetAttributes(attrs...)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -399,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()}
}
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -414,15 +414,15 @@ func (s epochSource) CurrentEpoch() uint64 {
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload []byte, chunkSize int,
hdr *object.Object, payload datagen.Payload, chunkSize int,
) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
bufSize = chunkSize
}
buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload)
sz := rdr.Size()
rdr := payload.Reader()
sz := payload.Size()
// starting upload
start := time.Now()

View file

@ -1,7 +1,6 @@
package s3
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
@ -9,12 +8,12 @@ import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
@ -51,9 +50,9 @@ type (
}
)
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
rdr := bytes.NewReader(payload.Bytes())
sz := rdr.Size()
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
rdr := payload.Reader()
sz := payload.Size()
start := time.Now()
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
@ -74,7 +73,7 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse {
if objPartSize < multipartUploadMinPartSize {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
@ -86,8 +85,8 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay
u.Concurrency = concurrency
})
payloadReader := bytes.NewReader(payload.Bytes())
sz := payloadReader.Len()
payloadReader := payload.Reader()
sz := payload.Size()
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
Bucket: aws.String(bucket),

View file

@ -1,15 +1,14 @@
package s3local
import (
"bytes"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
@ -35,7 +34,7 @@ type (
GetResponse SuccessOrErrorResponse
)
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
if c.limiter.IsFull() {
return PutResponse{
Success: false,
@ -58,8 +57,8 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
},
Header: map[string]string{},
Object: key,
Size: int64(len(payload.Bytes())),
Reader: bytes.NewReader(payload.Bytes()),
Size: int64(payload.Size()),
Reader: payload.Reader(),
}
start := time.Now()

View file

@ -149,7 +149,7 @@ export function obj_write() {
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
@ -157,7 +157,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
}
}

View file

@ -174,7 +174,7 @@ export function obj_write() {
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
@ -182,7 +182,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
}
}

View file

@ -97,10 +97,12 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const data = {
field: uuidv4(),
file: http.file(payload, "random.data"),
// Because we use `file` wrapping and it is not straightforward to use streams here,
// `-e STREAMING=1` has no effect for this scenario.
file: http.file(payload.bytes(), "random.data"),
};
const resp = http.post(`http://${http_endpoint}/upload/${container}`, data);
@ -110,7 +112,7 @@ export function obj_write() {
}
const object_id = JSON.parse(resp.body).object_id;
if (obj_registry) {
obj_registry.addObject(container, object_id, "", "", hash);
obj_registry.addObject(container, object_id, "", "", payload.hash());
}
}

View file

@ -128,7 +128,7 @@ export function obj_write() {
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = local_client.put(container, headers, payload);
if (!resp.success) {
if (resp.abort) {
@ -139,7 +139,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
}
}

View file

@ -145,7 +145,7 @@ export function obj_write() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
@ -153,7 +153,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}

View file

@ -172,7 +172,7 @@ export function obj_write() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
@ -180,7 +180,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}

View file

@ -92,7 +92,7 @@ export function obj_write_multipart() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const {payload, hash} = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
@ -100,6 +100,6 @@ export function obj_write_multipart() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}

View file

@ -124,7 +124,7 @@ export function obj_write() {
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const payload = generator.genPayload(registry_enabled);
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
if (resp.abort) {
@ -135,7 +135,7 @@ export function obj_write() {
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
obj_registry.addObject("", "", bucket, key, payload.hash());
}
}