diff --git a/api/handler/get.go b/api/handler/get.go index f617a871..8779b86b 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -255,7 +255,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { } if err = objPayload.StreamTo(w); err != nil { - h.logAndSendError(ctx, w, "could not stream object payload", reqInfo, err) + h.logError(ctx, "could not stream object payload", reqInfo, err) return } } diff --git a/api/handler/get_test.go b/api/handler/get_test.go index 3521c5ef..3010e37e 100644 --- a/api/handler/get_test.go +++ b/api/handler/get_test.go @@ -2,6 +2,7 @@ package handler import ( "bytes" + "context" "errors" "fmt" "io" @@ -197,6 +198,19 @@ func TestGetObject(t *testing.T) { getObjectAssertS3Error(hc, bktName, objName, emptyVersion, apierr.ErrNoSuchKey) } +func TestGetObjectStreamError(t *testing.T) { + hc := prepareHandlerContext(t) + bktName, objName := "bucket", "obj" + info := createBucket(hc, bktName) + + putObject(hc, bktName, objName) + addr := getAddressOfLastVersion(hc, info.BktInfo, objName) + hc.tp.SetObjectStreamError(addr, 4, context.Canceled) + + d, _ := getObject(hc, bktName, objName) + require.Equal(t, "cont", string(d)) +} + func TestGetDeletedObject(t *testing.T) { hc := prepareHandlerContextWithMinCache(t) bktName, objName := "bucket", "obj" diff --git a/api/handler/util.go b/api/handler/util.go index 7d12af0b..f5520b31 100644 --- a/api/handler/util.go +++ b/api/handler/util.go @@ -45,6 +45,19 @@ func (h *handler) logAndSendError(ctx context.Context, w http.ResponseWriter, lo h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...) } +func (h *handler) logError(ctx context.Context, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) { + fields := []zap.Field{ + zap.String("method", reqInfo.API), + zap.String("bucket", reqInfo.BucketName), + zap.String("object", reqInfo.ObjectName), + zap.String("description", logText), + zap.String("user", reqInfo.User), + zap.Error(err), + } + fields = append(fields, additional...) + h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...) +} + func handleDeleteMarker(w http.ResponseWriter, err error) error { var target layer.DeleteMarkerError if !errors.As(err, &target) { diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index c18268fc..193c15c3 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -74,27 +74,34 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string { var _ frostfs.FrostFS = (*TestFrostFS)(nil) +type offsetError struct { + offset int + err error +} + type TestFrostFS struct { - objects map[string]*object.Object - copiesNumbers map[string][]uint32 - objectErrors map[string]error - objectPutErrors map[string]error - containers map[string]*container.Container - chains map[string][]chain.Chain - currentEpoch uint64 - key *keys.PrivateKey - tombstoneOIDCount int + objects map[string]*object.Object + copiesNumbers map[string][]uint32 + objectErrors map[string]error + objectStreamErrors map[string]offsetError + objectPutErrors map[string]error + containers map[string]*container.Container + chains map[string][]chain.Chain + currentEpoch uint64 + key *keys.PrivateKey + tombstoneOIDCount int } func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS { return &TestFrostFS{ - objects: make(map[string]*object.Object), - copiesNumbers: make(map[string][]uint32), - objectErrors: make(map[string]error), - objectPutErrors: make(map[string]error), - containers: make(map[string]*container.Container), - chains: make(map[string][]chain.Chain), - key: key, + objects: make(map[string]*object.Object), + copiesNumbers: make(map[string][]uint32), + objectErrors: make(map[string]error), + objectStreamErrors: make(map[string]offsetError), + objectPutErrors: make(map[string]error), + containers: make(map[string]*container.Container), + chains: make(map[string][]chain.Chain), + key: key, } } @@ -110,6 +117,14 @@ func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) { } } +func (t *TestFrostFS) SetObjectStreamError(addr oid.Address, offset int, err error) { + if err == nil { + delete(t.objectStreamErrors, addr.EncodeToString()) + } else { + t.objectStreamErrors[addr.EncodeToString()] = offsetError{offset: offset, err: err} + } +} + func (t *TestFrostFS) SetObjectPutError(fileName string, err error) { if err == nil { delete(t.objectPutErrors, fileName) @@ -261,11 +276,33 @@ func (t *TestFrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) ( } return &frostfs.Object{ - Header: *obj, - Payload: io.NopCloser(bytes.NewReader(obj.Payload())), + Header: *obj, + Payload: &objPayload{ + r: bytes.NewReader(obj.Payload()), + offsetErr: t.objectStreamErrors[prm.Container.EncodeToString()+"/"+prm.Object.EncodeToString()], + }, }, nil } +type objPayload struct { + offset int + r io.Reader + offsetErr offsetError +} + +func (o *objPayload) Read(p []byte) (n int, err error) { + n, err = o.r.Read(p) + if o.offsetErr.err != nil && o.offset+n > o.offsetErr.offset { + return o.offsetErr.offset - o.offset, o.offsetErr.err + } + o.offset += n + return n, err +} + +func (o *objPayload) Close() error { + return nil +} + func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) { obj, err := t.retrieveObject(ctx, prm.Container, prm.Object) if err != nil {