[#647] Dont send error after returning object payload
Some checks failed
/ DCO (pull_request) Successful in 34s
/ Builds (pull_request) Successful in 1m41s
/ Lint (pull_request) Failing after 1m0s
/ Tests (pull_request) Successful in 1m18s
/ Vulncheck (pull_request) Successful in 2m10s
/ OCI image (pull_request) Successful in 2m29s
Some checks failed
/ DCO (pull_request) Successful in 34s
/ Builds (pull_request) Successful in 1m41s
/ Lint (pull_request) Failing after 1m0s
/ Tests (pull_request) Successful in 1m18s
/ Vulncheck (pull_request) Successful in 2m10s
/ OCI image (pull_request) Successful in 2m29s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
bfec3e0a5e
commit
f27a554991
4 changed files with 83 additions and 19 deletions
|
@ -255,7 +255,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if err = objPayload.StreamTo(w); err != nil {
|
||||
h.logAndSendError(ctx, w, "could not stream object payload", reqInfo, err)
|
||||
h.logError(ctx, "could not stream object payload", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package handler
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -197,6 +198,19 @@ func TestGetObject(t *testing.T) {
|
|||
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, apierr.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
func TestGetObjectStreamError(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
bktName, objName := "bucket", "obj"
|
||||
info := createBucket(hc, bktName)
|
||||
|
||||
putObject(hc, bktName, objName)
|
||||
addr := getAddressOfLastVersion(hc, info.BktInfo, objName)
|
||||
hc.tp.SetObjectStreamError(addr, 4, context.Canceled)
|
||||
|
||||
d, _ := getObject(hc, bktName, objName)
|
||||
require.Equal(t, "cont", string(d))
|
||||
}
|
||||
|
||||
func TestGetDeletedObject(t *testing.T) {
|
||||
hc := prepareHandlerContextWithMinCache(t)
|
||||
bktName, objName := "bucket", "obj"
|
||||
|
|
|
@ -45,6 +45,19 @@ func (h *handler) logAndSendError(ctx context.Context, w http.ResponseWriter, lo
|
|||
h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...)
|
||||
}
|
||||
|
||||
func (h *handler) logError(ctx context.Context, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) {
|
||||
fields := []zap.Field{
|
||||
zap.String("method", reqInfo.API),
|
||||
zap.String("bucket", reqInfo.BucketName),
|
||||
zap.String("object", reqInfo.ObjectName),
|
||||
zap.String("description", logText),
|
||||
zap.String("user", reqInfo.User),
|
||||
zap.Error(err),
|
||||
}
|
||||
fields = append(fields, additional...)
|
||||
h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...)
|
||||
}
|
||||
|
||||
func handleDeleteMarker(w http.ResponseWriter, err error) error {
|
||||
var target layer.DeleteMarkerError
|
||||
if !errors.As(err, &target) {
|
||||
|
|
|
@ -74,10 +74,16 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
|||
|
||||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||
|
||||
type offsetError struct {
|
||||
offset int
|
||||
err error
|
||||
}
|
||||
|
||||
type TestFrostFS struct {
|
||||
objects map[string]*object.Object
|
||||
copiesNumbers map[string][]uint32
|
||||
objectErrors map[string]error
|
||||
objectStreamErrors map[string]offsetError
|
||||
objectPutErrors map[string]error
|
||||
containers map[string]*container.Container
|
||||
chains map[string][]chain.Chain
|
||||
|
@ -91,6 +97,7 @@ func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
|||
objects: make(map[string]*object.Object),
|
||||
copiesNumbers: make(map[string][]uint32),
|
||||
objectErrors: make(map[string]error),
|
||||
objectStreamErrors: make(map[string]offsetError),
|
||||
objectPutErrors: make(map[string]error),
|
||||
containers: make(map[string]*container.Container),
|
||||
chains: make(map[string][]chain.Chain),
|
||||
|
@ -110,6 +117,14 @@ func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) SetObjectStreamError(addr oid.Address, offset int, err error) {
|
||||
if err == nil {
|
||||
delete(t.objectStreamErrors, addr.EncodeToString())
|
||||
} else {
|
||||
t.objectStreamErrors[addr.EncodeToString()] = offsetError{offset: offset, err: err}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) SetObjectPutError(fileName string, err error) {
|
||||
if err == nil {
|
||||
delete(t.objectPutErrors, fileName)
|
||||
|
@ -262,10 +277,32 @@ func (t *TestFrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) (
|
|||
|
||||
return &frostfs.Object{
|
||||
Header: *obj,
|
||||
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
|
||||
Payload: &objPayload{
|
||||
r: bytes.NewReader(obj.Payload()),
|
||||
offsetErr: t.objectStreamErrors[prm.Container.EncodeToString()+"/"+prm.Object.EncodeToString()],
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
type objPayload struct {
|
||||
offset int
|
||||
r io.Reader
|
||||
offsetErr offsetError
|
||||
}
|
||||
|
||||
func (o objPayload) Read(p []byte) (n int, err error) {
|
||||
n, err = o.r.Read(p)
|
||||
if o.offsetErr.err != nil && o.offset+n > o.offsetErr.offset {
|
||||
return o.offsetErr.offset - o.offset, o.offsetErr.err
|
||||
}
|
||||
o.offset += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (o objPayload) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) {
|
||||
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
|
||||
if err != nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue