From 74121bb387c2dc3187c77ceb2407d82fbfb08486 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 11 Jan 2024 21:46:57 +0300 Subject: [PATCH 1/5] [#114] datagen: Refactor payload generation Return an interface which can be converted to `io.Reader` as well as report payload size and hash. Signed-off-by: Evgenii Stratonikov --- internal/datagen/generator.go | 20 ++----------- internal/datagen/payload.go | 50 ++++++++++++++++++++++++++++++++ internal/datagen/payload_test.go | 28 ++++++++++++++++++ internal/native/client.go | 14 ++++----- internal/s3/client.go | 15 +++++----- internal/s3local/client.go | 9 +++--- scenarios/grpc.js | 4 +-- scenarios/grpc_car.js | 4 +-- scenarios/http.js | 8 +++-- scenarios/local.js | 4 +-- scenarios/s3.js | 4 +-- scenarios/s3_car.js | 4 +-- scenarios/s3_multipart.js | 4 +-- scenarios/s3local.js | 4 +-- 14 files changed, 117 insertions(+), 55 deletions(-) create mode 100644 internal/datagen/payload.go create mode 100644 internal/datagen/payload_test.go diff --git a/internal/datagen/generator.go b/internal/datagen/generator.go index 9b1fe29..8dbc093 100644 --- a/internal/datagen/generator.go +++ b/internal/datagen/generator.go @@ -2,12 +2,9 @@ package datagen import ( "bytes" - "crypto/sha256" - "encoding/hex" "math/rand" "time" - "github.com/dop251/goja" "github.com/go-loremipsum/loremipsum" "go.k6.io/k6/js/modules" ) @@ -29,11 +26,6 @@ type ( typ string offset int } - - GenPayloadResponse struct { - Payload goja.ArrayBuffer - Hash string - } ) // TailSize specifies number of extra random bytes in the buffer tail. @@ -89,17 +81,9 @@ func (g *Generator) fillBuffer() { } } -func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse { +func (g *Generator) GenPayload(calcHash bool) Payload { data := g.nextSlice() - - dataHash := "" - if calcHash { - hashBytes := sha256.Sum256(data) - dataHash = hex.EncodeToString(hashBytes[:]) - } - - payload := g.vu.Runtime().NewArrayBuffer(data) - return GenPayloadResponse{Payload: payload, Hash: dataHash} + return NewFixedPayload(data) } func (g *Generator) nextSlice() []byte { diff --git a/internal/datagen/payload.go b/internal/datagen/payload.go new file mode 100644 index 0000000..efd0c20 --- /dev/null +++ b/internal/datagen/payload.go @@ -0,0 +1,50 @@ +package datagen + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "io" +) + +// Payload represents arbitrary data to be packed into S3 or native object. +// Implementations could be thread-unsafe. +type Payload interface { + // Reader returns io.Reader instance to read the payload. + // Must not be called twice. + Reader() io.Reader + // Bytes is a helper which reads all data from Reader() into slice. + // The sole purpose of this method is to simplify HTTP scenario, + // where all payload needs to be read and wrapped. + Bytes() []byte + // Size returns payload size, which is equal to the total amount of data + // that could be read from the Reader(). + Size() int + // Hash returns payload sha256 hash. Must be called after all data is read from the reader. + Hash() string +} + +type bytesPayload struct { + data []byte +} + +func (p *bytesPayload) Reader() io.Reader { + return bytes.NewReader(p.data) +} + +func (p *bytesPayload) Size() int { + return len(p.data) +} + +func (p *bytesPayload) Hash() string { + h := sha256.Sum256(p.data[:]) + return hex.EncodeToString(h[:]) +} + +func (p *bytesPayload) Bytes() []byte { + return p.data +} + +func NewFixedPayload(data []byte) Payload { + return &bytesPayload{data: data} +} diff --git a/internal/datagen/payload_test.go b/internal/datagen/payload_test.go new file mode 100644 index 0000000..c85f3b0 --- /dev/null +++ b/internal/datagen/payload_test.go @@ -0,0 +1,28 @@ +package datagen + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFixedPayload(t *testing.T) { + const size = 123 + data := make([]byte, size) + _, err := rand.Read(data) + require.NoError(t, err) + + p := NewFixedPayload(data) + require.Equal(t, size, p.Size()) + + actual, err := io.ReadAll(p.Reader()) + require.NoError(t, err) + require.Equal(t, data, actual) + + h := sha256.Sum256(data) + require.Equal(t, hex.EncodeToString(h[:]), p.Hash()) +} diff --git a/internal/native/client.go b/internal/native/client.go index e4ca8e3..614886d 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -1,7 +1,6 @@ package native import ( - "bytes" "context" "crypto/ecdsa" "crypto/sha256" @@ -23,6 +22,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/tzhash/tz" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/dop251/goja" "go.k6.io/k6/js/modules" @@ -77,7 +77,7 @@ type ( const defaultBufferSize = 64 * 1024 -func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse { +func (c *Client) Put(containerID string, headers map[string]string, payload datagen.Payload, chunkSize int) PutResponse { cliContainerID := parseContainerID(containerID) tok := c.tok @@ -104,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja o.SetOwnerID(&owner) o.SetAttributes(attrs...) - resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize) + resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize) if err != nil { return PutResponse{Success: false, Error: err.Error()} } @@ -399,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse { return PutResponse{Success: false, Error: err.Error()} } - _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0) + _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0) if err != nil { return PutResponse{Success: false, Error: err.Error()} } @@ -414,15 +414,15 @@ func (s epochSource) CurrentEpoch() uint64 { } func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object, - hdr *object.Object, payload []byte, chunkSize int, + hdr *object.Object, payload datagen.Payload, chunkSize int, ) (*client.ResObjectPut, error) { bufSize := defaultBufferSize if chunkSize > 0 { bufSize = chunkSize } buf := make([]byte, bufSize) - rdr := bytes.NewReader(payload) - sz := rdr.Size() + rdr := payload.Reader() + sz := payload.Size() // starting upload start := time.Now() diff --git a/internal/s3/client.go b/internal/s3/client.go index 23d6b86..029190a 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -1,7 +1,6 @@ package s3 import ( - "bytes" "context" "crypto/sha256" "encoding/hex" @@ -9,12 +8,12 @@ import ( "strconv" "time" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" - "github.com/dop251/goja" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" ) @@ -51,9 +50,9 @@ type ( } ) -func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { - rdr := bytes.NewReader(payload.Bytes()) - sz := rdr.Size() +func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse { + rdr := payload.Reader() + sz := payload.Size() start := time.Now() _, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{ @@ -74,7 +73,7 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB -func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse { +func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse { if objPartSize < multipartUploadMinPartSize { stats.Report(c.vu, objPutFails, 1) return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)} @@ -86,8 +85,8 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay u.Concurrency = concurrency }) - payloadReader := bytes.NewReader(payload.Bytes()) - sz := payloadReader.Len() + payloadReader := payload.Reader() + sz := payload.Size() _, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{ Bucket: aws.String(bucket), diff --git a/internal/s3local/client.go b/internal/s3local/client.go index 93c65d5..b9a062c 100644 --- a/internal/s3local/client.go +++ b/internal/s3local/client.go @@ -1,15 +1,14 @@ package s3local import ( - "bytes" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" - "github.com/dop251/goja" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" ) @@ -35,7 +34,7 @@ type ( GetResponse SuccessOrErrorResponse ) -func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { +func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse { if c.limiter.IsFull() { return PutResponse{ Success: false, @@ -58,8 +57,8 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { }, Header: map[string]string{}, Object: key, - Size: int64(len(payload.Bytes())), - Reader: bytes.NewReader(payload.Bytes()), + Size: int64(payload.Size()), + Reader: payload.Reader(), } start := time.Now() diff --git a/scenarios/grpc.js b/scenarios/grpc.js index f3d8075..a4329b4 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -149,7 +149,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size); if (!resp.success) { log.withField("cid", container).error(resp.error); @@ -157,7 +157,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject(container, resp.object_id, "", "", hash); + obj_registry.addObject(container, resp.object_id, "", "", payload.hash()); } } diff --git a/scenarios/grpc_car.js b/scenarios/grpc_car.js index 7f4f306..b8236e2 100644 --- a/scenarios/grpc_car.js +++ b/scenarios/grpc_car.js @@ -174,7 +174,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size); if (!resp.success) { log.withField("cid", container).error(resp.error); @@ -182,7 +182,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject(container, resp.object_id, "", "", hash); + obj_registry.addObject(container, resp.object_id, "", "", payload.hash()); } } diff --git a/scenarios/http.js b/scenarios/http.js index d13dd5b..1e31a4a 100644 --- a/scenarios/http.js +++ b/scenarios/http.js @@ -97,10 +97,12 @@ export function obj_write() { const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const data = { field: uuidv4(), - file: http.file(payload, "random.data"), + // Because we use `file` wrapping and it is not straightforward to use streams here, + // `-e STREAMING=1` has no effect for this scenario. + file: http.file(payload.bytes(), "random.data"), }; const resp = http.post(`http://${http_endpoint}/upload/${container}`, data); @@ -110,7 +112,7 @@ export function obj_write() { } const object_id = JSON.parse(resp.body).object_id; if (obj_registry) { - obj_registry.addObject(container, object_id, "", "", hash); + obj_registry.addObject(container, object_id, "", "", payload.hash()); } } diff --git a/scenarios/local.js b/scenarios/local.js index 56def89..50f9c7e 100644 --- a/scenarios/local.js +++ b/scenarios/local.js @@ -128,7 +128,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = local_client.put(container, headers, payload); if (!resp.success) { if (resp.abort) { @@ -139,7 +139,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject(container, resp.object_id, "", "", hash); + obj_registry.addObject(container, resp.object_id, "", "", payload.hash()); } } diff --git a/scenarios/s3.js b/scenarios/s3.js index deff64e..517721e 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -145,7 +145,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); @@ -153,7 +153,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } diff --git a/scenarios/s3_car.js b/scenarios/s3_car.js index 1581bc9..ceda927 100644 --- a/scenarios/s3_car.js +++ b/scenarios/s3_car.js @@ -172,7 +172,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); @@ -180,7 +180,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } diff --git a/scenarios/s3_multipart.js b/scenarios/s3_multipart.js index f7095b7..e9e1297 100644 --- a/scenarios/s3_multipart.js +++ b/scenarios/s3_multipart.js @@ -92,7 +92,7 @@ export function obj_write_multipart() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const {payload, hash} = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); @@ -100,6 +100,6 @@ export function obj_write_multipart() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } diff --git a/scenarios/s3local.js b/scenarios/s3local.js index 6ea26d2..0d08733 100644 --- a/scenarios/s3local.js +++ b/scenarios/s3local.js @@ -124,7 +124,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const { payload, hash } = generator.genPayload(registry_enabled); + const payload = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { if (resp.abort) { @@ -135,7 +135,7 @@ export function obj_write() { } if (obj_registry) { - obj_registry.addObject("", "", bucket, key, hash); + obj_registry.addObject("", "", bucket, key, payload.hash()); } } -- 2.45.2 From 4544ec616bfa31dbf337cfe2aaa3ee729c188346 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 11 Jan 2024 21:59:36 +0300 Subject: [PATCH 2/5] [#114] datagen: Allow to generate streaming payloads Signed-off-by: Evgenii Stratonikov --- internal/datagen/datagen.go | 4 +- internal/datagen/generator.go | 25 ++++++++--- internal/datagen/generator_test.go | 10 ++--- internal/datagen/payload.go | 71 ++++++++++++++++++++++++++++++ internal/datagen/payload_test.go | 12 +++++ scenarios/libs/datagen.js | 2 +- scenarios/run_scenarios.md | 1 + 7 files changed, 110 insertions(+), 15 deletions(-) diff --git a/internal/datagen/datagen.go b/internal/datagen/datagen.go index 7994875..2efeb2b 100644 --- a/internal/datagen/datagen.go +++ b/internal/datagen/datagen.go @@ -38,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports { return modules.Exports{Default: d} } -func (d *Datagen) Generator(size int, typ string) *Generator { - g := NewGenerator(d.vu, size, strings.ToLower(typ)) +func (d *Datagen) Generator(size int, typ string, streaming bool) *Generator { + g := NewGenerator(d.vu, size, strings.ToLower(typ), streaming) return &g } diff --git a/internal/datagen/generator.go b/internal/datagen/generator.go index 8dbc093..0617ddc 100644 --- a/internal/datagen/generator.go +++ b/internal/datagen/generator.go @@ -3,6 +3,7 @@ package datagen import ( "bytes" "math/rand" + "sync/atomic" "time" "github.com/go-loremipsum/loremipsum" @@ -25,6 +26,9 @@ type ( buf []byte typ string offset int + + streaming bool + seed *atomic.Int64 } ) @@ -37,7 +41,7 @@ var payloadTypes = []string{ "", } -func NewGenerator(vu modules.VU, size int, typ string) Generator { +func NewGenerator(vu modules.VU, size int, typ string, streaming bool) Generator { if size <= 0 { panic("size should be positive") } @@ -52,17 +56,20 @@ func NewGenerator(vu modules.VU, size int, typ string) Generator { if !found { vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ) } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - buf := make([]byte, size+TailSize) g := Generator{ vu: vu, size: size, - rand: r, - buf: buf, typ: typ, } - g.fillBuffer() + + if streaming { + g.streaming = true + g.seed = new(atomic.Int64) + } else { + g.rand = rand.New(rand.NewSource(time.Now().UnixNano())) + g.buf = make([]byte, size+TailSize) + g.fillBuffer() + } return g } @@ -82,6 +89,10 @@ func (g *Generator) fillBuffer() { } func (g *Generator) GenPayload(calcHash bool) Payload { + if g.streaming { + return NewStreamPayload(g.size, g.seed.Add(1), g.typ) + } + data := g.nextSlice() return NewFixedPayload(data) } diff --git a/internal/datagen/generator_test.go b/internal/datagen/generator_test.go index a82dd25..cbc4c0f 100644 --- a/internal/datagen/generator_test.go +++ b/internal/datagen/generator_test.go @@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) { t.Run("fails on negative size", func(t *testing.T) { require.Panics(t, func() { - _ = NewGenerator(vu, -1, "") + _ = NewGenerator(vu, -1, "", false) }) }) t.Run("fails on zero size", func(t *testing.T) { require.Panics(t, func() { - _ = NewGenerator(vu, 0, "") + _ = NewGenerator(vu, 0, "", false) }) }) t.Run("creates slice of specified size", func(t *testing.T) { size := 10 - g := NewGenerator(vu, size, "") + g := NewGenerator(vu, size, "", false) slice := g.nextSlice() require.Len(t, slice, size) }) t.Run("creates a different slice on each call", func(t *testing.T) { - g := NewGenerator(vu, 1000, "") + g := NewGenerator(vu, 1000, "", false) slice1 := g.nextSlice() slice2 := g.nextSlice() // Each slice should be unique (assuming that 1000 random bytes will never coincide @@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) { }) t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) { - g := NewGenerator(vu, 1000, "") + g := NewGenerator(vu, 1000, "", false) initialSlice := g.nextSlice() for i := 0; i < TailSize; i++ { g.nextSlice() diff --git a/internal/datagen/payload.go b/internal/datagen/payload.go index efd0c20..65b5425 100644 --- a/internal/datagen/payload.go +++ b/internal/datagen/payload.go @@ -1,10 +1,15 @@ package datagen import ( + "bufio" "bytes" "crypto/sha256" "encoding/hex" + "hash" "io" + "math/rand" + + "github.com/go-loremipsum/loremipsum" ) // Payload represents arbitrary data to be packed into S3 or native object. @@ -48,3 +53,69 @@ func (p *bytesPayload) Bytes() []byte { func NewFixedPayload(data []byte) Payload { return &bytesPayload{data: data} } + +type randomPayload struct { + r io.Reader + s hash.Hash + h string + size int +} + +func NewStreamPayload(size int, seed int64, typ string) Payload { + var rr io.Reader + switch typ { + case "text": + rr = &textReader{li: loremipsum.NewWithSeed(seed)} + default: + rr = rand.New(rand.NewSource(seed)) + } + + lr := io.LimitReader(rr, int64(size)) + // We need some buffering to write complete blocks in the TeeReader. + // Streaming payload read is expected to be used for big objects, thus 4k seems like a good choice. + br := bufio.NewReaderSize(lr, 4096) + s := sha256.New() + tr := io.TeeReader(br, s) + return &randomPayload{ + r: tr, + s: s, + size: size, + } +} + +func (p *randomPayload) Reader() io.Reader { + return p.r +} + +func (p *randomPayload) Size() int { + return p.size +} + +func (p *randomPayload) Hash() string { + if p.h == "" { + p.h = hex.EncodeToString(p.s.Sum(nil)) + // Prevent possible misuse. + p.r = nil + p.s = nil + } + return p.h +} + +func (p *randomPayload) Bytes() []byte { + data, err := io.ReadAll(p.r) + if err != nil { + // We use only 2 readers, either `bytes.Reader` or `rand.Reader`. + // None of them returns errors, thus encountering an error is a fatal error. + panic(err) + } + return data +} + +type textReader struct { + li *loremipsum.LoremIpsum +} + +func (r *textReader) Read(p []byte) (n int, err error) { + paragraph := r.li.Paragraph() + return copy(p, paragraph), nil +} diff --git a/internal/datagen/payload_test.go b/internal/datagen/payload_test.go index c85f3b0..b7351b3 100644 --- a/internal/datagen/payload_test.go +++ b/internal/datagen/payload_test.go @@ -26,3 +26,15 @@ func TestFixedPayload(t *testing.T) { h := sha256.Sum256(data) require.Equal(t, hex.EncodeToString(h[:]), p.Hash()) } + +func TestStreamingPayload(t *testing.T) { + const size = 123 + + p := NewStreamPayload(size, 0, "") + require.Equal(t, size, p.Size()) + + actual, err := io.ReadAll(p.Reader()) + require.NoError(t, err) + require.Equal(t, size, len(actual)) + require.Equal(t, sha256.Size*2, len(p.Hash())) +} diff --git a/scenarios/libs/datagen.js b/scenarios/libs/datagen.js index d6feba2..37bb460 100644 --- a/scenarios/libs/datagen.js +++ b/scenarios/libs/datagen.js @@ -2,7 +2,7 @@ import datagen from 'k6/x/frostfs/datagen'; export function newGenerator(condition) { if (condition) { - return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || ""); + return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "", !!__ENV.STREAMING); } return undefined; } diff --git a/scenarios/run_scenarios.md b/scenarios/run_scenarios.md index 846e075..c94ad5a 100644 --- a/scenarios/run_scenarios.md +++ b/scenarios/run_scenarios.md @@ -19,6 +19,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt * `SLEEP_READ` - time interval (in seconds) between reading VU iterations. * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). * `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random"). + * `STREAMING` - if set, the payload is generated on the fly and is not read into memory fully. Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`: ```shell -- 2.45.2 From d8af19cc835e50e6a6302a0ac99e2348a16e7b82 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 12 Jan 2024 17:37:29 +0300 Subject: [PATCH 3/5] [#114] datagen: Remove calcHash parameter in GenPayload() Hash calculation is now done on-demand with a method call. Signed-off-by: Evgenii Stratonikov --- internal/datagen/generator.go | 2 +- scenarios/grpc.js | 2 +- scenarios/grpc_car.js | 2 +- scenarios/http.js | 2 +- scenarios/local.js | 2 +- scenarios/s3.js | 2 +- scenarios/s3_car.js | 2 +- scenarios/s3_multipart.js | 2 +- scenarios/s3local.js | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/datagen/generator.go b/internal/datagen/generator.go index 0617ddc..845d7fd 100644 --- a/internal/datagen/generator.go +++ b/internal/datagen/generator.go @@ -88,7 +88,7 @@ func (g *Generator) fillBuffer() { } } -func (g *Generator) GenPayload(calcHash bool) Payload { +func (g *Generator) GenPayload() Payload { if g.streaming { return NewStreamPayload(g.size, g.seed.Add(1), g.typ) } diff --git a/scenarios/grpc.js b/scenarios/grpc.js index a4329b4..6f675cd 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -149,7 +149,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size); if (!resp.success) { log.withField("cid", container).error(resp.error); diff --git a/scenarios/grpc_car.js b/scenarios/grpc_car.js index b8236e2..19ceaf9 100644 --- a/scenarios/grpc_car.js +++ b/scenarios/grpc_car.js @@ -174,7 +174,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size); if (!resp.success) { log.withField("cid", container).error(resp.error); diff --git a/scenarios/http.js b/scenarios/http.js index 1e31a4a..81215e6 100644 --- a/scenarios/http.js +++ b/scenarios/http.js @@ -97,7 +97,7 @@ export function obj_write() { const container = container_list[Math.floor(Math.random() * container_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const data = { field: uuidv4(), // Because we use `file` wrapping and it is not straightforward to use streams here, diff --git a/scenarios/local.js b/scenarios/local.js index 50f9c7e..c90610c 100644 --- a/scenarios/local.js +++ b/scenarios/local.js @@ -128,7 +128,7 @@ export function obj_write() { }; const container = container_list[Math.floor(Math.random() * container_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = local_client.put(container, headers, payload); if (!resp.success) { if (resp.abort) { diff --git a/scenarios/s3.js b/scenarios/s3.js index 517721e..814f073 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -145,7 +145,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); diff --git a/scenarios/s3_car.js b/scenarios/s3_car.js index ceda927..6eb94c6 100644 --- a/scenarios/s3_car.js +++ b/scenarios/s3_car.js @@ -172,7 +172,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); diff --git a/scenarios/s3_multipart.js b/scenarios/s3_multipart.js index e9e1297..43e0607 100644 --- a/scenarios/s3_multipart.js +++ b/scenarios/s3_multipart.js @@ -92,7 +92,7 @@ export function obj_write_multipart() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload); if (!resp.success) { log.withFields({bucket: bucket, key: key}).error(resp.error); diff --git a/scenarios/s3local.js b/scenarios/s3local.js index 0d08733..fe31ed0 100644 --- a/scenarios/s3local.js +++ b/scenarios/s3local.js @@ -124,7 +124,7 @@ export function obj_write() { const key = __ENV.OBJ_NAME || uuidv4(); const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; - const payload = generator.genPayload(registry_enabled); + const payload = generator.genPayload(); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { if (resp.abort) { -- 2.45.2 From 636a1e9290a59b9da82f17f5cedace505eb2d049 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 12 Jan 2024 19:28:13 +0300 Subject: [PATCH 4/5] [#114] internal: Resolve linter issues Signed-off-by: Evgenii Stratonikov --- internal/datagen/generator.go | 2 +- internal/local/local.go | 6 ++++-- internal/native/client.go | 14 ++++++-------- internal/native/native.go | 6 +++--- internal/s3local/local.go | 5 ++++- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/internal/datagen/generator.go b/internal/datagen/generator.go index 845d7fd..33b4180 100644 --- a/internal/datagen/generator.go +++ b/internal/datagen/generator.go @@ -84,7 +84,7 @@ func (g *Generator) fillBuffer() { } g.buf = b.Bytes() default: - rand.Read(g.buf) // Per docs, err is always nil here + g.rand.Read(g.buf) // Per docs, err is always nil here } } diff --git a/internal/local/local.go b/internal/local/local.go index 6e207d2..788061d 100644 --- a/internal/local/local.go +++ b/internal/local/local.go @@ -248,7 +248,7 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([] var shOpts [][]shard.Option - engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { + err := engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { opts := []shard.Option{ shard.WithRefillMetabase(sc.RefillMetabase()), shard.WithMode(sc.Mode()), @@ -375,7 +375,9 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([] return nil }) - + if err != nil { + return nil, nil, fmt.Errorf("iterate shards: %w", err) + } return ngOpts, shOpts, nil } diff --git a/internal/native/client.go b/internal/native/client.go index 614886d..e6bc330 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -309,10 +309,9 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse { } start := time.Now() - var prm client.PrmContainerPut - prm.SetContainer(cnr) - - res, err := c.cli.ContainerPut(c.vu.Context(), prm) + res, err := c.cli.ContainerPut(c.vu.Context(), client.PrmContainerPut{ + Container: &cnr, + }) if err != nil { return c.putCnrErrorResponse(err) } @@ -500,10 +499,9 @@ func (x *waitParams) setDefaults() { func (c *Client) waitForContainerPresence(ctx context.Context, cnrID cid.ID, wp *waitParams) error { return waitFor(ctx, wp, func(ctx context.Context) bool { - var prm client.PrmContainerGet - prm.SetContainer(cnrID) - - _, err := c.cli.ContainerGet(ctx, prm) + _, err := c.cli.ContainerGet(ctx, client.PrmContainerGet{ + ContainerID: &cnrID, + }) return err == nil }) } diff --git a/internal/native/native.go b/internal/native/native.go index 95a8e7d..ca02bbf 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -89,9 +89,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime // generate session token exp := uint64(math.MaxUint64) - var prmSessionCreate client.PrmSessionCreate - prmSessionCreate.SetExp(exp) - sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate) + sessionResp, err := cli.SessionCreate(n.vu.Context(), client.PrmSessionCreate{ + Expiration: exp, + }) if err != nil { return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err) } diff --git a/internal/s3local/local.go b/internal/s3local/local.go index e760fe7..3da7435 100644 --- a/internal/s3local/local.go +++ b/internal/s3local/local.go @@ -151,7 +151,10 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s } l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg) - l.Initialize(s.l.VU().Context(), nopEventListener{}) + err = l.Initialize(s.l.VU().Context(), nopEventListener{}) + if err != nil { + return nil, fmt.Errorf("initialize: %w", err) + } return &Client{ vu: s.l.VU(), -- 2.45.2 From 339e4e52ec6be7fcb660dba56175ca6fea725b27 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 12 Jan 2024 19:29:10 +0300 Subject: [PATCH 5/5] [#114] .forgejo: Add golanci-lint workflow Signed-off-by: Evgenii Stratonikov --- .forgejo/workflows/tests.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.forgejo/workflows/tests.yml b/.forgejo/workflows/tests.yml index 1fa9608..c0b3a1b 100644 --- a/.forgejo/workflows/tests.yml +++ b/.forgejo/workflows/tests.yml @@ -2,6 +2,23 @@ name: Tests and linters on: [pull_request] jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.21' + cache: true + + - name: golangci-lint + uses: https://github.com/golangci/golangci-lint-action@v3 + with: + version: latest + tests: name: Tests runs-on: ubuntu-latest -- 2.45.2