Do not store all payload in memory #115
22 changed files with 260 additions and 83 deletions
|
@ -2,6 +2,23 @@ name: Tests and linters
|
||||||
on: [pull_request]
|
on: [pull_request]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
lint:
|
||||||
|
name: Lint
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v3
|
||||||
|
with:
|
||||||
|
go-version: '1.21'
|
||||||
|
cache: true
|
||||||
|
|
||||||
|
- name: golangci-lint
|
||||||
|
uses: https://github.com/golangci/golangci-lint-action@v3
|
||||||
|
with:
|
||||||
|
version: latest
|
||||||
|
|
||||||
tests:
|
tests:
|
||||||
name: Tests
|
name: Tests
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
|
@ -38,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
|
||||||
return modules.Exports{Default: d}
|
return modules.Exports{Default: d}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Datagen) Generator(size int, typ string) *Generator {
|
func (d *Datagen) Generator(size int, typ string, streaming bool) *Generator {
|
||||||
g := NewGenerator(d.vu, size, strings.ToLower(typ))
|
g := NewGenerator(d.vu, size, strings.ToLower(typ), streaming)
|
||||||
return &g
|
return &g
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,12 +2,10 @@ package datagen
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/hex"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dop251/goja"
|
|
||||||
"github.com/go-loremipsum/loremipsum"
|
"github.com/go-loremipsum/loremipsum"
|
||||||
"go.k6.io/k6/js/modules"
|
"go.k6.io/k6/js/modules"
|
||||||
)
|
)
|
||||||
|
@ -28,11 +26,9 @@ type (
|
||||||
buf []byte
|
buf []byte
|
||||||
typ string
|
typ string
|
||||||
offset int
|
offset int
|
||||||
}
|
|
||||||
|
|
||||||
GenPayloadResponse struct {
|
streaming bool
|
||||||
Payload goja.ArrayBuffer
|
seed *atomic.Int64
|
||||||
Hash string
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -45,7 +41,7 @@ var payloadTypes = []string{
|
||||||
"",
|
"",
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGenerator(vu modules.VU, size int, typ string) Generator {
|
func NewGenerator(vu modules.VU, size int, typ string, streaming bool) Generator {
|
||||||
if size <= 0 {
|
if size <= 0 {
|
||||||
panic("size should be positive")
|
panic("size should be positive")
|
||||||
}
|
}
|
||||||
|
@ -60,17 +56,20 @@ func NewGenerator(vu modules.VU, size int, typ string) Generator {
|
||||||
if !found {
|
if !found {
|
||||||
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
|
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
||||||
buf := make([]byte, size+TailSize)
|
|
||||||
g := Generator{
|
g := Generator{
|
||||||
vu: vu,
|
vu: vu,
|
||||||
size: size,
|
size: size,
|
||||||
rand: r,
|
|
||||||
buf: buf,
|
|
||||||
typ: typ,
|
typ: typ,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if streaming {
|
||||||
|
g.streaming = true
|
||||||
|
g.seed = new(atomic.Int64)
|
||||||
|
} else {
|
||||||
|
g.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
g.buf = make([]byte, size+TailSize)
|
||||||
g.fillBuffer()
|
g.fillBuffer()
|
||||||
|
}
|
||||||
return g
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,21 +84,17 @@ func (g *Generator) fillBuffer() {
|
||||||
}
|
}
|
||||||
g.buf = b.Bytes()
|
g.buf = b.Bytes()
|
||||||
default:
|
default:
|
||||||
rand.Read(g.buf) // Per docs, err is always nil here
|
g.rand.Read(g.buf) // Per docs, err is always nil here
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
|
func (g *Generator) GenPayload() Payload {
|
||||||
|
if g.streaming {
|
||||||
|
return NewStreamPayload(g.size, g.seed.Add(1), g.typ)
|
||||||
|
}
|
||||||
|
|
||||||
data := g.nextSlice()
|
data := g.nextSlice()
|
||||||
|
return NewFixedPayload(data)
|
||||||
dataHash := ""
|
|
||||||
if calcHash {
|
|
||||||
hashBytes := sha256.Sum256(data)
|
|
||||||
dataHash = hex.EncodeToString(hashBytes[:])
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := g.vu.Runtime().NewArrayBuffer(data)
|
|
||||||
return GenPayloadResponse{Payload: payload, Hash: dataHash}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Generator) nextSlice() []byte {
|
func (g *Generator) nextSlice() []byte {
|
||||||
|
|
|
@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
|
||||||
|
|
||||||
t.Run("fails on negative size", func(t *testing.T) {
|
t.Run("fails on negative size", func(t *testing.T) {
|
||||||
require.Panics(t, func() {
|
require.Panics(t, func() {
|
||||||
_ = NewGenerator(vu, -1, "")
|
_ = NewGenerator(vu, -1, "", false)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("fails on zero size", func(t *testing.T) {
|
t.Run("fails on zero size", func(t *testing.T) {
|
||||||
require.Panics(t, func() {
|
require.Panics(t, func() {
|
||||||
_ = NewGenerator(vu, 0, "")
|
_ = NewGenerator(vu, 0, "", false)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("creates slice of specified size", func(t *testing.T) {
|
t.Run("creates slice of specified size", func(t *testing.T) {
|
||||||
size := 10
|
size := 10
|
||||||
g := NewGenerator(vu, size, "")
|
g := NewGenerator(vu, size, "", false)
|
||||||
slice := g.nextSlice()
|
slice := g.nextSlice()
|
||||||
require.Len(t, slice, size)
|
require.Len(t, slice, size)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("creates a different slice on each call", func(t *testing.T) {
|
t.Run("creates a different slice on each call", func(t *testing.T) {
|
||||||
g := NewGenerator(vu, 1000, "")
|
g := NewGenerator(vu, 1000, "", false)
|
||||||
slice1 := g.nextSlice()
|
slice1 := g.nextSlice()
|
||||||
slice2 := g.nextSlice()
|
slice2 := g.nextSlice()
|
||||||
// Each slice should be unique (assuming that 1000 random bytes will never coincide
|
// Each slice should be unique (assuming that 1000 random bytes will never coincide
|
||||||
|
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
|
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
|
||||||
g := NewGenerator(vu, 1000, "")
|
g := NewGenerator(vu, 1000, "", false)
|
||||||
initialSlice := g.nextSlice()
|
initialSlice := g.nextSlice()
|
||||||
for i := 0; i < TailSize; i++ {
|
for i := 0; i < TailSize; i++ {
|
||||||
g.nextSlice()
|
g.nextSlice()
|
||||||
|
|
121
internal/datagen/payload.go
Normal file
121
internal/datagen/payload.go
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
package datagen
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"hash"
|
||||||
|
"io"
|
||||||
|
"math/rand"
|
||||||
|
|
||||||
|
"github.com/go-loremipsum/loremipsum"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Payload represents arbitrary data to be packed into S3 or native object.
|
||||||
|
// Implementations could be thread-unsafe.
|
||||||
|
type Payload interface {
|
||||||
|
// Reader returns io.Reader instance to read the payload.
|
||||||
|
// Must not be called twice.
|
||||||
|
Reader() io.Reader
|
||||||
|
// Bytes is a helper which reads all data from Reader() into slice.
|
||||||
|
// The sole purpose of this method is to simplify HTTP scenario,
|
||||||
|
// where all payload needs to be read and wrapped.
|
||||||
|
Bytes() []byte
|
||||||
|
// Size returns payload size, which is equal to the total amount of data
|
||||||
|
// that could be read from the Reader().
|
||||||
|
Size() int
|
||||||
|
// Hash returns payload sha256 hash. Must be called after all data is read from the reader.
|
||||||
|
Hash() string
|
||||||
|
}
|
||||||
|
|
||||||
|
type bytesPayload struct {
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *bytesPayload) Reader() io.Reader {
|
||||||
|
return bytes.NewReader(p.data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *bytesPayload) Size() int {
|
||||||
|
return len(p.data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *bytesPayload) Hash() string {
|
||||||
|
h := sha256.Sum256(p.data[:])
|
||||||
|
return hex.EncodeToString(h[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *bytesPayload) Bytes() []byte {
|
||||||
|
return p.data
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFixedPayload(data []byte) Payload {
|
||||||
|
return &bytesPayload{data: data}
|
||||||
|
}
|
||||||
|
|
||||||
|
type randomPayload struct {
|
||||||
|
r io.Reader
|
||||||
|
s hash.Hash
|
||||||
|
h string
|
||||||
|
size int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStreamPayload(size int, seed int64, typ string) Payload {
|
||||||
|
var rr io.Reader
|
||||||
|
switch typ {
|
||||||
|
case "text":
|
||||||
|
rr = &textReader{li: loremipsum.NewWithSeed(seed)}
|
||||||
|
default:
|
||||||
|
rr = rand.New(rand.NewSource(seed))
|
||||||
|
}
|
||||||
|
|
||||||
|
lr := io.LimitReader(rr, int64(size))
|
||||||
|
// We need some buffering to write complete blocks in the TeeReader.
|
||||||
|
// Streaming payload read is expected to be used for big objects, thus 4k seems like a good choice.
|
||||||
|
br := bufio.NewReaderSize(lr, 4096)
|
||||||
|
s := sha256.New()
|
||||||
|
tr := io.TeeReader(br, s)
|
||||||
|
return &randomPayload{
|
||||||
|
r: tr,
|
||||||
|
s: s,
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *randomPayload) Reader() io.Reader {
|
||||||
|
return p.r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *randomPayload) Size() int {
|
||||||
|
return p.size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *randomPayload) Hash() string {
|
||||||
|
if p.h == "" {
|
||||||
|
p.h = hex.EncodeToString(p.s.Sum(nil))
|
||||||
|
// Prevent possible misuse.
|
||||||
|
p.r = nil
|
||||||
|
p.s = nil
|
||||||
|
}
|
||||||
|
return p.h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *randomPayload) Bytes() []byte {
|
||||||
|
data, err := io.ReadAll(p.r)
|
||||||
|
if err != nil {
|
||||||
|
// We use only 2 readers, either `bytes.Reader` or `rand.Reader`.
|
||||||
|
// None of them returns errors, thus encountering an error is a fatal error.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
type textReader struct {
|
||||||
|
li *loremipsum.LoremIpsum
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *textReader) Read(p []byte) (n int, err error) {
|
||||||
|
paragraph := r.li.Paragraph()
|
||||||
|
return copy(p, paragraph), nil
|
||||||
|
}
|
40
internal/datagen/payload_test.go
Normal file
40
internal/datagen/payload_test.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
package datagen
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFixedPayload(t *testing.T) {
|
||||||
|
const size = 123
|
||||||
|
data := make([]byte, size)
|
||||||
|
_, err := rand.Read(data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
p := NewFixedPayload(data)
|
||||||
|
require.Equal(t, size, p.Size())
|
||||||
|
|
||||||
|
actual, err := io.ReadAll(p.Reader())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, data, actual)
|
||||||
|
|
||||||
|
h := sha256.Sum256(data)
|
||||||
|
require.Equal(t, hex.EncodeToString(h[:]), p.Hash())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingPayload(t *testing.T) {
|
||||||
|
const size = 123
|
||||||
|
|
||||||
|
p := NewStreamPayload(size, 0, "")
|
||||||
|
require.Equal(t, size, p.Size())
|
||||||
|
|
||||||
|
actual, err := io.ReadAll(p.Reader())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, size, len(actual))
|
||||||
|
require.Equal(t, sha256.Size*2, len(p.Hash()))
|
||||||
|
}
|
|
@ -248,7 +248,7 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]
|
||||||
|
|
||||||
var shOpts [][]shard.Option
|
var shOpts [][]shard.Option
|
||||||
|
|
||||||
engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
|
err := engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
|
||||||
opts := []shard.Option{
|
opts := []shard.Option{
|
||||||
shard.WithRefillMetabase(sc.RefillMetabase()),
|
shard.WithRefillMetabase(sc.RefillMetabase()),
|
||||||
shard.WithMode(sc.Mode()),
|
shard.WithMode(sc.Mode()),
|
||||||
|
@ -375,7 +375,9 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("iterate shards: %w", err)
|
||||||
|
}
|
||||||
return ngOpts, shOpts, nil
|
return ngOpts, shOpts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package native
|
package native
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
@ -23,6 +22,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||||
"github.com/dop251/goja"
|
"github.com/dop251/goja"
|
||||||
"go.k6.io/k6/js/modules"
|
"go.k6.io/k6/js/modules"
|
||||||
|
@ -77,7 +77,7 @@ type (
|
||||||
|
|
||||||
const defaultBufferSize = 64 * 1024
|
const defaultBufferSize = 64 * 1024
|
||||||
|
|
||||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
|
func (c *Client) Put(containerID string, headers map[string]string, payload datagen.Payload, chunkSize int) PutResponse {
|
||||||
cliContainerID := parseContainerID(containerID)
|
cliContainerID := parseContainerID(containerID)
|
||||||
|
|
||||||
tok := c.tok
|
tok := c.tok
|
||||||
|
@ -104,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
|
||||||
o.SetOwnerID(&owner)
|
o.SetOwnerID(&owner)
|
||||||
o.SetAttributes(attrs...)
|
o.SetAttributes(attrs...)
|
||||||
|
|
||||||
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
|
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -309,10 +309,9 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
var prm client.PrmContainerPut
|
res, err := c.cli.ContainerPut(c.vu.Context(), client.PrmContainerPut{
|
||||||
prm.SetContainer(cnr)
|
Container: &cnr,
|
||||||
|
})
|
||||||
res, err := c.cli.ContainerPut(c.vu.Context(), prm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return c.putCnrErrorResponse(err)
|
return c.putCnrErrorResponse(err)
|
||||||
}
|
}
|
||||||
|
@ -399,7 +398,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
|
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -414,15 +413,15 @@ func (s epochSource) CurrentEpoch() uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
||||||
hdr *object.Object, payload []byte, chunkSize int,
|
hdr *object.Object, payload datagen.Payload, chunkSize int,
|
||||||
) (*client.ResObjectPut, error) {
|
) (*client.ResObjectPut, error) {
|
||||||
bufSize := defaultBufferSize
|
bufSize := defaultBufferSize
|
||||||
if chunkSize > 0 {
|
if chunkSize > 0 {
|
||||||
bufSize = chunkSize
|
bufSize = chunkSize
|
||||||
}
|
}
|
||||||
buf := make([]byte, bufSize)
|
buf := make([]byte, bufSize)
|
||||||
rdr := bytes.NewReader(payload)
|
rdr := payload.Reader()
|
||||||
sz := rdr.Size()
|
sz := payload.Size()
|
||||||
|
|
||||||
// starting upload
|
// starting upload
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
@ -500,10 +499,9 @@ func (x *waitParams) setDefaults() {
|
||||||
|
|
||||||
func (c *Client) waitForContainerPresence(ctx context.Context, cnrID cid.ID, wp *waitParams) error {
|
func (c *Client) waitForContainerPresence(ctx context.Context, cnrID cid.ID, wp *waitParams) error {
|
||||||
return waitFor(ctx, wp, func(ctx context.Context) bool {
|
return waitFor(ctx, wp, func(ctx context.Context) bool {
|
||||||
var prm client.PrmContainerGet
|
_, err := c.cli.ContainerGet(ctx, client.PrmContainerGet{
|
||||||
prm.SetContainer(cnrID)
|
ContainerID: &cnrID,
|
||||||
|
})
|
||||||
_, err := c.cli.ContainerGet(ctx, prm)
|
|
||||||
return err == nil
|
return err == nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,9 +89,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
|
|
||||||
// generate session token
|
// generate session token
|
||||||
exp := uint64(math.MaxUint64)
|
exp := uint64(math.MaxUint64)
|
||||||
var prmSessionCreate client.PrmSessionCreate
|
sessionResp, err := cli.SessionCreate(n.vu.Context(), client.PrmSessionCreate{
|
||||||
prmSessionCreate.SetExp(exp)
|
Expiration: exp,
|
||||||
sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate)
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err)
|
return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
@ -9,12 +8,12 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||||
"github.com/dop251/goja"
|
|
||||||
"go.k6.io/k6/js/modules"
|
"go.k6.io/k6/js/modules"
|
||||||
"go.k6.io/k6/metrics"
|
"go.k6.io/k6/metrics"
|
||||||
)
|
)
|
||||||
|
@ -51,9 +50,9 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
|
||||||
rdr := bytes.NewReader(payload.Bytes())
|
rdr := payload.Reader()
|
||||||
sz := rdr.Size()
|
sz := payload.Size()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
||||||
|
@ -74,7 +73,7 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||||
|
|
||||||
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
|
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
|
||||||
|
|
||||||
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
|
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse {
|
||||||
if objPartSize < multipartUploadMinPartSize {
|
if objPartSize < multipartUploadMinPartSize {
|
||||||
stats.Report(c.vu, objPutFails, 1)
|
stats.Report(c.vu, objPutFails, 1)
|
||||||
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
|
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
|
||||||
|
@ -86,8 +85,8 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay
|
||||||
u.Concurrency = concurrency
|
u.Concurrency = concurrency
|
||||||
})
|
})
|
||||||
|
|
||||||
payloadReader := bytes.NewReader(payload.Bytes())
|
payloadReader := payload.Reader()
|
||||||
sz := payloadReader.Len()
|
sz := payload.Size()
|
||||||
|
|
||||||
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
|
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
|
|
|
@ -1,15 +1,14 @@
|
||||||
package s3local
|
package s3local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
|
||||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||||
"github.com/dop251/goja"
|
|
||||||
"go.k6.io/k6/js/modules"
|
"go.k6.io/k6/js/modules"
|
||||||
"go.k6.io/k6/metrics"
|
"go.k6.io/k6/metrics"
|
||||||
)
|
)
|
||||||
|
@ -35,7 +34,7 @@ type (
|
||||||
GetResponse SuccessOrErrorResponse
|
GetResponse SuccessOrErrorResponse
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
|
||||||
if c.limiter.IsFull() {
|
if c.limiter.IsFull() {
|
||||||
return PutResponse{
|
return PutResponse{
|
||||||
Success: false,
|
Success: false,
|
||||||
|
@ -58,8 +57,8 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||||
},
|
},
|
||||||
Header: map[string]string{},
|
Header: map[string]string{},
|
||||||
Object: key,
|
Object: key,
|
||||||
Size: int64(len(payload.Bytes())),
|
Size: int64(payload.Size()),
|
||||||
Reader: bytes.NewReader(payload.Bytes()),
|
Reader: payload.Reader(),
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
|
@ -151,7 +151,10 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s
|
||||||
}
|
}
|
||||||
|
|
||||||
l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg)
|
l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg)
|
||||||
l.Initialize(s.l.VU().Context(), nopEventListener{})
|
err = l.Initialize(s.l.VU().Context(), nopEventListener{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("initialize: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
vu: s.l.VU(),
|
vu: s.l.VU(),
|
||||||
|
|
|
@ -149,7 +149,7 @@ export function obj_write() {
|
||||||
};
|
};
|
||||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
log.withField("cid", container).error(resp.error);
|
log.withField("cid", container).error(resp.error);
|
||||||
|
@ -157,7 +157,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -174,7 +174,7 @@ export function obj_write() {
|
||||||
};
|
};
|
||||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
log.withField("cid", container).error(resp.error);
|
log.withField("cid", container).error(resp.error);
|
||||||
|
@ -182,7 +182,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,10 +97,12 @@ export function obj_write() {
|
||||||
|
|
||||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const data = {
|
const data = {
|
||||||
field: uuidv4(),
|
field: uuidv4(),
|
||||||
file: http.file(payload, "random.data"),
|
// Because we use `file` wrapping and it is not straightforward to use streams here,
|
||||||
|
// `-e STREAMING=1` has no effect for this scenario.
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
file: http.file(payload.bytes(), "random.data"),
|
||||||
};
|
};
|
||||||
|
|
||||||
const resp = http.post(`http://${http_endpoint}/upload/${container}`, data);
|
const resp = http.post(`http://${http_endpoint}/upload/${container}`, data);
|
||||||
|
@ -110,7 +112,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
const object_id = JSON.parse(resp.body).object_id;
|
const object_id = JSON.parse(resp.body).object_id;
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject(container, object_id, "", "", hash);
|
obj_registry.addObject(container, object_id, "", "", payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ import datagen from 'k6/x/frostfs/datagen';
|
||||||
|
|
||||||
export function newGenerator(condition) {
|
export function newGenerator(condition) {
|
||||||
if (condition) {
|
if (condition) {
|
||||||
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
|
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "", !!__ENV.STREAMING);
|
||||||
}
|
}
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,7 +128,7 @@ export function obj_write() {
|
||||||
};
|
};
|
||||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = local_client.put(container, headers, payload);
|
const resp = local_client.put(container, headers, payload);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
if (resp.abort) {
|
if (resp.abort) {
|
||||||
|
@ -139,7 +139,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
obj_registry.addObject(container, resp.object_id, "", "", payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
|
||||||
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
|
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
|
||||||
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
|
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
|
||||||
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
|
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
|
||||||
|
* `STREAMING` - if set, the payload is generated on the fly and is not read into memory fully.
|
||||||
|
|
||||||
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
|
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
|
||||||
```shell
|
```shell
|
||||||
|
|
|
@ -145,7 +145,7 @@ export function obj_write() {
|
||||||
const key = __ENV.OBJ_NAME || uuidv4();
|
const key = __ENV.OBJ_NAME || uuidv4();
|
||||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = s3_client.put(bucket, key, payload);
|
const resp = s3_client.put(bucket, key, payload);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||||
|
@ -153,7 +153,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject("", "", bucket, key, hash);
|
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -172,7 +172,7 @@ export function obj_write() {
|
||||||
const key = __ENV.OBJ_NAME || uuidv4();
|
const key = __ENV.OBJ_NAME || uuidv4();
|
||||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = s3_client.put(bucket, key, payload);
|
const resp = s3_client.put(bucket, key, payload);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||||
|
@ -180,7 +180,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject("", "", bucket, key, hash);
|
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ export function obj_write_multipart() {
|
||||||
const key = __ENV.OBJ_NAME || uuidv4();
|
const key = __ENV.OBJ_NAME || uuidv4();
|
||||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||||
|
|
||||||
const {payload, hash} = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
|
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||||
|
@ -100,6 +100,6 @@ export function obj_write_multipart() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject("", "", bucket, key, hash);
|
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,7 +124,7 @@ export function obj_write() {
|
||||||
const key = __ENV.OBJ_NAME || uuidv4();
|
const key = __ENV.OBJ_NAME || uuidv4();
|
||||||
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||||
|
|
||||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
const payload = generator.genPayload();
|
||||||
const resp = s3_client.put(bucket, key, payload);
|
const resp = s3_client.put(bucket, key, payload);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
if (resp.abort) {
|
if (resp.abort) {
|
||||||
|
@ -135,7 +135,7 @@ export function obj_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
obj_registry.addObject("", "", bucket, key, hash);
|
obj_registry.addObject("", "", bucket, key, payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
This is so annoying, haven't yet come up with a solution, will fix in this PR, thus WIP
04a8119ef3/js/modules/k6/http/file.go (L25)
Decided to avoid streaming for http scenario.