[#178] wrapReader: Fix goroutine leak

In case of error in FrostFS.CreateObject wrapped reader
can be blocked because of synchronous pipe. We have to read out all payload in such case.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-08-01 17:57:41 +03:00
parent 52931663e1
commit fe897ec588
4 changed files with 49 additions and 9 deletions

View file

@ -11,6 +11,7 @@ This document outlines major changes between releases.
- `grpc` schemas in tree configuration (#166) - `grpc` schemas in tree configuration (#166)
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158) - Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
- Replace part on re-upload when use multipart upload (#176) - Replace part on re-upload when use multipart upload (#176)
- Fix goroutine leak on put object error (#178)
### Added ### Added
- Support dump metrics descriptions (#80) - Support dump metrics descriptions (#80)

View file

@ -3,11 +3,14 @@ package handler
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"runtime"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -172,6 +175,24 @@ func TestPutObjectWithStreamBodyError(t *testing.T) {
checkNotFound(t, tc, bktName, objName, emptyVersion) checkNotFound(t, tc, bktName, objName, emptyVersion)
} }
func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := make([]byte, 128*1024)
_, err := rand.Read(content)
require.NoError(t, err)
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
tc.tp.SetObjectPutError(objName, errors.New("some error"))
numGoroutineBefore := runtime.NumGoroutine()
tc.Handler().PutObjectHandler(w, r)
numGoroutineAfter := runtime.NumGoroutine()
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
}
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) { func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
hc := prepareHandlerContext(t) hc := prepareHandlerContext(t)

View file

@ -28,19 +28,21 @@ import (
type TestFrostFS struct { type TestFrostFS struct {
FrostFS FrostFS
objects map[string]*object.Object objects map[string]*object.Object
objectErrors map[string]error objectErrors map[string]error
containers map[string]*container.Container objectPutErrors map[string]error
eaclTables map[string]*eacl.Table containers map[string]*container.Container
currentEpoch uint64 eaclTables map[string]*eacl.Table
currentEpoch uint64
} }
func NewTestFrostFS() *TestFrostFS { func NewTestFrostFS() *TestFrostFS {
return &TestFrostFS{ return &TestFrostFS{
objects: make(map[string]*object.Object), objects: make(map[string]*object.Object),
objectErrors: make(map[string]error), objectErrors: make(map[string]error),
containers: make(map[string]*container.Container), objectPutErrors: make(map[string]error),
eaclTables: make(map[string]*eacl.Table), containers: make(map[string]*container.Container),
eaclTables: make(map[string]*eacl.Table),
} }
} }
@ -56,6 +58,14 @@ func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) {
} }
} }
func (t *TestFrostFS) SetObjectPutError(fileName string, err error) {
if err == nil {
delete(t.objectPutErrors, fileName)
} else {
t.objectPutErrors[fileName] = err
}
}
func (t *TestFrostFS) Objects() []*object.Object { func (t *TestFrostFS) Objects() []*object.Object {
res := make([]*object.Object, 0, len(t.objects)) res := make([]*object.Object, 0, len(t.objects))
@ -199,6 +209,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
attrs := make([]object.Attribute, 0) attrs := make([]object.Attribute, 0)
if err := t.objectPutErrors[prm.Filepath]; err != nil {
return oid.ID{}, err
}
if prm.Filepath != "" { if prm.Filepath != "" {
a := object.NewAttribute() a := object.NewAttribute()
a.SetKey(object.AttributeFilePath) a.SetKey(object.AttributeFilePath)

View file

@ -469,6 +469,10 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
}) })
id, err := n.frostFS.CreateObject(ctx, prm) id, err := n.frostFS.CreateObject(ctx, prm)
if err != nil { if err != nil {
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
n.reqLogger(ctx).Warn("failed to discard put payload, probably goroutine leaks", zap.Error(errDiscard))
}
return 0, oid.ID{}, nil, err return 0, oid.ID{}, nil, err
} }
return size, id, hash.Sum(nil), nil return size, id, hash.Sum(nil), nil