package s3local import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" ) type Client struct { vu modules.VU l layer.Client ownerID *user.ID resolver layer.BucketResolver limiter local.Limiter } type ( SuccessOrErrorResponse struct { Success bool Abort bool Error string } CreateBucketResponse SuccessOrErrorResponse PutResponse SuccessOrErrorResponse DeleteResponse SuccessOrErrorResponse GetResponse SuccessOrErrorResponse ) func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse { if c.limiter.IsFull() { return PutResponse{ Success: false, Abort: true, Error: "engine size limit reached", } } cid, err := c.resolver.Resolve(c.vu.Context(), bucket) if err != nil { stats.Report(c.vu, objPutFails, 1) return PutResponse{Error: err.Error()} } prm := &layer.PutObjectParams{ BktInfo: &data.BucketInfo{ Name: bucket, CID: cid, Owner: *c.ownerID, Created: time.Now(), }, Header: map[string]string{}, Object: key, Size: uint64(payload.Size()), Reader: payload.Reader(), } start := time.Now() if _, err := c.l.PutObject(c.vu.Context(), prm); err != nil { stats.Report(c.vu, objPutFails, 1) return PutResponse{Error: err.Error()} } stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start))) stats.Report(c.vu, objPutSuccess, 1) stats.ReportDataSent(c.vu, float64(prm.Size)) stats.Report(c.vu, objPutData, float64(prm.Size)) return PutResponse{Success: true} } func (c *Client) Get(bucket, key string) GetResponse { cid, err := c.resolver.Resolve(c.vu.Context(), bucket) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Error: err.Error()} } start := time.Now() bktInfo := &data.BucketInfo{ Name: bucket, CID: cid, Owner: *c.ownerID, } headPrm := &layer.HeadObjectParams{ BktInfo: bktInfo, Object: key, } extInfo, err := c.l.GetExtendedObjectInfo(c.vu.Context(), headPrm) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Error: err.Error()} } wr := &recvDataReporter{} getPrm := &layer.GetObjectParams{ BucketInfo: bktInfo, ObjectInfo: extInfo.ObjectInfo, Range: &layer.RangeParams{ Start: 0, End: uint64(extInfo.ObjectInfo.Size), }, } objPayload, err := c.l.GetObject(c.vu.Context(), getPrm) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Error: err.Error()} } err = objPayload.StreamTo(wr) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Error: err.Error()} } stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) stats.Report(c.vu, objGetSuccess, 1) stats.ReportDataReceived(c.vu, wr.total) stats.Report(c.vu, objGetData, wr.total) return GetResponse{Success: true} } type recvDataReporter struct{ total float64 } func (r *recvDataReporter) Write(p []byte) (int, error) { r.total += float64(len(p)) return len(p), nil }