[#178] wrapReader: Fix goroutine leak #178
4 changed files with 49 additions and 9 deletions
|
@ -11,6 +11,7 @@ This document outlines major changes between releases.
|
|||
- `grpc` schemas in tree configuration (#166)
|
||||
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
||||
- Replace part on re-upload when use multipart upload (#176)
|
||||
- Fix goroutine leak on put object error (#178)
|
||||
|
||||
### Added
|
||||
- Support dump metrics descriptions (#80)
|
||||
|
|
|
@ -3,11 +3,14 @@ package handler
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -172,6 +175,24 @@ func TestPutObjectWithStreamBodyError(t *testing.T) {
|
|||
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
||||
}
|
||||
|
||||
func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket-for-put", "object-for-put"
|
||||
createTestBucket(tc, bktName)
|
||||
|
||||
content := make([]byte, 128*1024)
|
||||
_, err := rand.Read(content)
|
||||
require.NoError(t, err)
|
||||
|
||||
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||
tc.tp.SetObjectPutError(objName, errors.New("some error"))
|
||||
numGoroutineBefore := runtime.NumGoroutine()
|
||||
tc.Handler().PutObjectHandler(w, r)
|
||||
numGoroutineAfter := runtime.NumGoroutine()
|
||||
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
|
||||
}
|
||||
|
||||
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ type TestFrostFS struct {
|
|||
|
||||
objects map[string]*object.Object
|
||||
objectErrors map[string]error
|
||||
objectPutErrors map[string]error
|
||||
containers map[string]*container.Container
|
||||
eaclTables map[string]*eacl.Table
|
||||
currentEpoch uint64
|
||||
|
@ -39,6 +40,7 @@ func NewTestFrostFS() *TestFrostFS {
|
|||
return &TestFrostFS{
|
||||
objects: make(map[string]*object.Object),
|
||||
objectErrors: make(map[string]error),
|
||||
objectPutErrors: make(map[string]error),
|
||||
containers: make(map[string]*container.Container),
|
||||
eaclTables: make(map[string]*eacl.Table),
|
||||
}
|
||||
|
@ -56,6 +58,14 @@ func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) SetObjectPutError(fileName string, err error) {
|
||||
if err == nil {
|
||||
delete(t.objectPutErrors, fileName)
|
||||
} else {
|
||||
t.objectPutErrors[fileName] = err
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) Objects() []*object.Object {
|
||||
res := make([]*object.Object, 0, len(t.objects))
|
||||
|
||||
|
@ -199,6 +209,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
|
|||
|
||||
attrs := make([]object.Attribute, 0)
|
||||
|
||||
if err := t.objectPutErrors[prm.Filepath]; err != nil {
|
||||
return oid.ID{}, err
|
||||
}
|
||||
|
||||
if prm.Filepath != "" {
|
||||
a := object.NewAttribute()
|
||||
a.SetKey(object.AttributeFilePath)
|
||||
|
|
|
@ -469,6 +469,10 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
|
|||
})
|
||||
id, err := n.frostFS.CreateObject(ctx, prm)
|
||||
if err != nil {
|
||||
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
|
||||
n.reqLogger(ctx).Warn("failed to discard put payload, probably goroutine leaks", zap.Error(errDiscard))
|
||||
}
|
||||
|
||||
return 0, oid.ID{}, nil, err
|
||||
}
|
||||
return size, id, hash.Sum(nil), nil
|
||||
|
|
Loading…
Reference in a new issue