From fe897ec58843d12daee7b7f48f803583046365f2 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 1 Aug 2023 17:57:41 +0300 Subject: [PATCH] [#178] wrapReader: Fix goroutine leak In case of error in FrostFS.CreateObject wrapped reader can be blocked because of synchronous pipe. We have to read out all payload in such case. Signed-off-by: Denis Kirillov --- CHANGELOG.md | 1 + api/handler/put_test.go | 21 +++++++++++++++++++++ api/layer/frostfs_mock.go | 32 +++++++++++++++++++++++--------- api/layer/object.go | 4 ++++ 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37d2e9b2..edbe8dce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This document outlines major changes between releases. - `grpc` schemas in tree configuration (#166) - Return appropriate 404 code when object missed in storage but there is in gate cache (#158) - Replace part on re-upload when use multipart upload (#176) +- Fix goroutine leak on put object error (#178) ### Added - Support dump metrics descriptions (#80) diff --git a/api/handler/put_test.go b/api/handler/put_test.go index 68b84894..94487d1a 100644 --- a/api/handler/put_test.go +++ b/api/handler/put_test.go @@ -3,11 +3,14 @@ package handler import ( "bytes" "context" + "crypto/rand" "encoding/json" + "errors" "io" "mime/multipart" "net/http" "net/http/httptest" + "runtime" "strconv" "strings" "testing" @@ -172,6 +175,24 @@ func TestPutObjectWithStreamBodyError(t *testing.T) { checkNotFound(t, tc, bktName, objName, emptyVersion) } +func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) { + tc := prepareHandlerContext(t) + + bktName, objName := "bucket-for-put", "object-for-put" + createTestBucket(tc, bktName) + + content := make([]byte, 128*1024) + _, err := rand.Read(content) + require.NoError(t, err) + + w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content)) + tc.tp.SetObjectPutError(objName, errors.New("some error")) + numGoroutineBefore := runtime.NumGoroutine() + tc.Handler().PutObjectHandler(w, r) + numGoroutineAfter := runtime.NumGoroutine() + require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object") +} + func TestPutObjectWithStreamBodyAWSExample(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index 87ab8cb5..72af1553 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -28,19 +28,21 @@ import ( type TestFrostFS struct { FrostFS - objects map[string]*object.Object - objectErrors map[string]error - containers map[string]*container.Container - eaclTables map[string]*eacl.Table - currentEpoch uint64 + objects map[string]*object.Object + objectErrors map[string]error + objectPutErrors map[string]error + containers map[string]*container.Container + eaclTables map[string]*eacl.Table + currentEpoch uint64 } func NewTestFrostFS() *TestFrostFS { return &TestFrostFS{ - objects: make(map[string]*object.Object), - objectErrors: make(map[string]error), - containers: make(map[string]*container.Container), - eaclTables: make(map[string]*eacl.Table), + objects: make(map[string]*object.Object), + objectErrors: make(map[string]error), + objectPutErrors: make(map[string]error), + containers: make(map[string]*container.Container), + eaclTables: make(map[string]*eacl.Table), } } @@ -56,6 +58,14 @@ func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) { } } +func (t *TestFrostFS) SetObjectPutError(fileName string, err error) { + if err == nil { + delete(t.objectPutErrors, fileName) + } else { + t.objectPutErrors[fileName] = err + } +} + func (t *TestFrostFS) Objects() []*object.Object { res := make([]*object.Object, 0, len(t.objects)) @@ -199,6 +209,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid. attrs := make([]object.Attribute, 0) + if err := t.objectPutErrors[prm.Filepath]; err != nil { + return oid.ID{}, err + } + if prm.Filepath != "" { a := object.NewAttribute() a.SetKey(object.AttributeFilePath) diff --git a/api/layer/object.go b/api/layer/object.go index 35c54765..dfdd471e 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -469,6 +469,10 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn }) id, err := n.frostFS.CreateObject(ctx, prm) if err != nil { + if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil { + n.reqLogger(ctx).Warn("failed to discard put payload, probably goroutine leaks", zap.Error(errDiscard)) + } + return 0, oid.ID{}, nil, err } return size, id, hash.Sum(nil), nil