From 74121bb387c2dc3187c77ceb2407d82fbfb08486 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 11 Jan 2024 21:46:57 +0300 Subject: [PATCH] [#114] datagen: Refactor payload generation Return an interface which can be converted to `io.Reader` as well as report payload size and hash. Signed-off-by: Evgenii Stratonikov --- internal/datagen/generator.go | 20 ++----------- internal/datagen/payload.go | 50 ++++++++++++++++++++++++++++++++ internal/datagen/payload_test.go | 28 ++++++++++++++++++ internal/native/client.go | 14 ++++----- internal/s3/client.go | 15 +++++----- internal/s3local/client.go | 9 +++--- scenarios/grpc.js | 4 +-- scenarios/grpc_car.js | 4 +-- scenarios/http.js | 8 +++-- scenarios/local.js | 4 +-- scenarios/s3.js | 4 +-- scenarios/s3_car.js | 4 +-- scenarios/s3_multipart.js | 4 +-- scenarios/s3local.js | 4 +-- 14 files changed, 117 insertions(+), 55 deletions(-) create mode 100644 internal/datagen/payload.go create mode 100644 internal/datagen/payload_test.go diff --git a/internal/datagen/generator.go b/internal/datagen/generator.go index 9b1fe29..8dbc093 100644 --- a/internal/datagen/generator.go +++ b/internal/datagen/generator.go @@ -2,12 +2,9 @@ package datagen import ( "bytes" - "crypto/sha256" - "encoding/hex" "math/rand" "time" - "github.com/dop251/goja" "github.com/go-loremipsum/loremipsum" "go.k6.io/k6/js/modules" ) @@ -29,11 +26,6 @@ type ( typ string offset int } - - GenPayloadResponse struct { - Payload goja.ArrayBuffer - Hash string - } ) // TailSize specifies number of extra random bytes in the buffer tail. @@ -89,17 +81,9 @@ func (g *Generator) fillBuffer() { } } -func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse { +func (g *Generator) GenPayload(calcHash bool) Payload { data := g.nextSlice() - - dataHash := "" - if calcHash { - hashBytes := sha256.Sum256(data) - dataHash = hex.EncodeToString(hashBytes[:]) - } - - payload := g.vu.Runtime().NewArrayBuffer(data) - return GenPayloadResponse{Payload: payload, Hash: dataHash} + return NewFixedPayload(data) } func (g *Generator) nextSlice() []byte { diff --git a/internal/datagen/payload.go b/internal/datagen/payload.go new file mode 100644 index 0000000..efd0c20 --- /dev/null +++ b/internal/datagen/payload.go @@ -0,0 +1,50 @@ +package datagen + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "io" +) + +// Payload represents arbitrary data to be packed into S3 or native object. +// Implementations could be thread-unsafe. +type Payload interface { + // Reader returns io.Reader instance to read the payload. + // Must not be called twice. + Reader() io.Reader + // Bytes is a helper which reads all data from Reader() into slice. + // The sole purpose of this method is to simplify HTTP scenario, + // where all payload needs to be read and wrapped. + Bytes() []byte + // Size returns payload size, which is equal to the total amount of data + // that could be read from the Reader(). + Size() int + // Hash returns payload sha256 hash. Must be called after all data is read from the reader. + Hash() string +} + +type bytesPayload struct { + data []byte +} + +func (p *bytesPayload) Reader() io.Reader { + return bytes.NewReader(p.data) +} + +func (p *bytesPayload) Size() int { + return len(p.data) +} + +func (p *bytesPayload) Hash() string { + h := sha256.Sum256(p.data[:]) + return hex.EncodeToString(h[:]) +} + +func (p *bytesPayload) Bytes() []byte { + return p.data +} + +func NewFixedPayload(data []byte) Payload { + return &bytesPayload{data: data} +} diff --git a/internal/datagen/payload_test.go b/internal/datagen/payload_test.go new file mode 100644 index 0000000..c85f3b0 --- /dev/null +++ b/internal/datagen/payload_test.go @@ -0,0 +1,28 @@ +package datagen + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFixedPayload(t *testing.T) { + const size = 123 + data := make([]byte, size) + _, err := rand.Read(data) + require.NoError(t, err) + + p := NewFixedPayload(data) + require.Equal(t, size, p.Size()) + + actual, err := io.ReadAll(p.Reader()) + require.NoError(t, err) + require.Equal(t, data, actual) + + h := sha256.Sum256(data) + require.Equal(t, hex.EncodeToString(h[:]), p.Hash()) +} diff --git a/internal/native/client.go b/internal/native/client.go index e4ca8e3..614886d 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -1,7 +1,6 @@ package native import ( - "bytes" "context" "crypto/ecdsa" "crypto/sha256" @@ -23,6 +22,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/tzhash/tz" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/dop251/goja" "go.k6.io/k6/js/modules" @@ -77,7 +77,7 @@ type ( const defaultBufferSize = 64 * 1024 -func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse { +func (c *Client) Put(containerID string, headers map[string]string, payload datagen.Payload, chunkSize int) PutResponse { cliContainerID := parseContainerID(containerID) tok := c.tok @@ -104,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja o.SetOwnerID(&owner) o.SetAttributes(attrs...) - resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize) + resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize) if err != nil { return PutResponse{Success: false, Error: err.Error()} } @@ -399,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse { return PutResponse{Success: false, Error: err.Error()} } - _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0) + _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0) if err != nil { return PutResponse{Success: false, Error: err.Error()} } @@ -414,15 +414,15 @@ func (s epochSource) CurrentEpoch() uint64 { } func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object, - hdr *object.Object, payload []byte, chunkSize int, + hdr *object.Object, payload datagen.Payload, chunkSize int, ) (*client.ResObjectPut, error) { bufSize := defaultBufferSize if chunkSize > 0 { bufSize = chunkSize } buf := make([]byte, bufSize) - rdr := bytes.NewReader(payload) - sz := rdr.Size() + rdr := payload.Reader() + sz := payload.Size() // starting upload start := time.Now() diff --git a/internal/s3/client.go b/internal/s3/client.go index 23d6b86..029190a 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -1,7 +1,6 @@ package s3 import ( - "bytes" "context" "crypto/sha256" "encoding/hex" @@ -9,12 +8,12 @@ import ( "strconv" "time" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" - "github.com/dop251/goja" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" ) @@ -51,9 +50,9 @@ type ( } ) -func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { - rdr := bytes.NewReader(payload.Bytes()) - sz := rdr.Size() +func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse { + rdr := payload.Reader() + sz := payload.Size() start := time.Now() _, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{ @@ -74,7 +73,7 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB -func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse { +func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse { if objPartSize < multipartUploadMinPartSize { stats.Report(c.vu, objPutFails, 1) return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)} @@ -86,8 +85,8 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay u.Concurrency = concurrency }) - payloadReader := bytes.NewReader(payload.Bytes()) - sz := payloadReader.Len() + payloadReader := payload.Reader() + sz := payload.Size() _, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{ Bucket: aws.String(bucket), diff --git a/internal/s3local/client.go b/internal/s3local/client.go index 93c65d5..b9a062c 100644 --- a/internal/s3local/client.go +++ b/internal/s3local/client.go @@ -1,15 +1,14 @@ package s3local import ( - "bytes" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" - "github.com/dop251/goja" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" ) @@ -35,7 +34,7 @@ type ( GetResponse SuccessOrErrorResponse ) -func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { +func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse { if c.limiter.IsFull() { return PutResponse{ Success: false, @@ -58,8 +57,8 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { }, Header: map[string]string{}, Object: key, - Size: int64(len(payload.Bytes())), - Reader: bytes.NewReader(payload.Bytes()), + Size: int64(payload.Size()), + Reader: payload.Reader(), } start := time.Now() diff --git a/scenarios/grpc.js b/scenarios/grpc.js index f3d8075..a4329b4 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -149,7 +149,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size); if (!resp.success) { log.withField("cid", container).error(resp.error); @@ -157,7 +157,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject(container, resp.object_id, "", "", hash); + obj_registry.addObject(container, resp.object_id, "", "", payload.hash()); } } diff --git a/scenarios/grpc_car.js b/scenarios/grpc_car.js index 7f4f306..b8236e2 100644 --- a/scenarios/grpc_car.js +++ b/scenarios/grpc_car.js @@ -174,7 +174,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size); if (!resp.success) { log.withField("cid", container).error(resp.error); @@ -182,7 +182,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject(container, resp.object_id, "", "", hash); + obj_registry.addObject(container, resp.object_id, "", "", payload.hash()); } } diff --git a/scenarios/http.js b/scenarios/http.js index d13dd5b..1e31a4a 100644 --- a/scenarios/http.js +++ b/scenarios/http.js @@ -97,10 +97,12 @@ export function obj_write() { const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const data = { field: uuidv4(), - file: http.file(payload, "random.data"), + // Because we use `file` wrapping and it is not straightforward to use streams here, + // `-e STREAMING=1` has no effect for this scenario. + file: http.file(payload.bytes(), "random.data"), }; const resp = http.post(`http://${http_endpoint}/upload/${container}`, data); @@ -110,7 +112,7 @@ export function obj_write() { } const object_id = JSON.parse(resp.body).object_id; if (obj_registry) { - obj_registry.addObject(container, object_id, "", "", hash); + obj_registry.addObject(container, object_id, "", "", payload.hash()); } } diff --git a/scenarios/local.js b/scenarios/local.js index 56def89..50f9c7e 100644 --- a/scenarios/local.js +++ b/scenarios/local.js @@ -128,7 +128,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = local_client.put(container, headers, payload); if (!resp.success) { if (resp.abort) { @@ -139,7 +139,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject(container, resp.object_id, "", "", hash); + obj_registry.addObject(container, resp.object_id, "", "", payload.hash()); } } diff --git a/scenarios/s3.js b/scenarios/s3.js index deff64e..517721e 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -145,7 +145,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); @@ -153,7 +153,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } diff --git a/scenarios/s3_car.js b/scenarios/s3_car.js index 1581bc9..ceda927 100644 --- a/scenarios/s3_car.js +++ b/scenarios/s3_car.js @@ -172,7 +172,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); @@ -180,7 +180,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } diff --git a/scenarios/s3_multipart.js b/scenarios/s3_multipart.js index f7095b7..e9e1297 100644 --- a/scenarios/s3_multipart.js +++ b/scenarios/s3_multipart.js @@ -92,7 +92,7 @@ export function obj_write_multipart() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const {payload, hash} = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); @@ -100,6 +100,6 @@ export function obj_write_multipart() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } diff --git a/scenarios/s3local.js b/scenarios/s3local.js index 6ea26d2..0d08733 100644 --- a/scenarios/s3local.js +++ b/scenarios/s3local.js @@ -124,7 +124,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { if (resp.abort) { @@ -135,7 +135,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } }