[#178] wrapReader: Fix goroutine leak #178
4 changed files with 49 additions and 9 deletions
|
@ -11,6 +11,7 @@ This document outlines major changes between releases.
|
||||||
- `grpc` schemas in tree configuration (#166)
|
- `grpc` schemas in tree configuration (#166)
|
||||||
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
||||||
- Replace part on re-upload when use multipart upload (#176)
|
- Replace part on re-upload when use multipart upload (#176)
|
||||||
|
- Fix goroutine leak on put object error (#178)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Support dump metrics descriptions (#80)
|
- Support dump metrics descriptions (#80)
|
||||||
|
|
|
@ -3,11 +3,14 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -172,6 +175,24 @@ func TestPutObjectWithStreamBodyError(t *testing.T) {
|
||||||
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-put", "object-for-put"
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
content := make([]byte, 128*1024)
|
||||||
|
_, err := rand.Read(content)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||||
|
tc.tp.SetObjectPutError(objName, errors.New("some error"))
|
||||||
|
numGoroutineBefore := runtime.NumGoroutine()
|
||||||
|
tc.Handler().PutObjectHandler(w, r)
|
||||||
|
numGoroutineAfter := runtime.NumGoroutine()
|
||||||
|
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
|
||||||
|
}
|
||||||
|
|
||||||
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -28,19 +28,21 @@ import (
|
||||||
type TestFrostFS struct {
|
type TestFrostFS struct {
|
||||||
FrostFS
|
FrostFS
|
||||||
|
|
||||||
objects map[string]*object.Object
|
objects map[string]*object.Object
|
||||||
objectErrors map[string]error
|
objectErrors map[string]error
|
||||||
containers map[string]*container.Container
|
objectPutErrors map[string]error
|
||||||
eaclTables map[string]*eacl.Table
|
containers map[string]*container.Container
|
||||||
currentEpoch uint64
|
eaclTables map[string]*eacl.Table
|
||||||
|
currentEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestFrostFS() *TestFrostFS {
|
func NewTestFrostFS() *TestFrostFS {
|
||||||
return &TestFrostFS{
|
return &TestFrostFS{
|
||||||
objects: make(map[string]*object.Object),
|
objects: make(map[string]*object.Object),
|
||||||
objectErrors: make(map[string]error),
|
objectErrors: make(map[string]error),
|
||||||
containers: make(map[string]*container.Container),
|
objectPutErrors: make(map[string]error),
|
||||||
eaclTables: make(map[string]*eacl.Table),
|
containers: make(map[string]*container.Container),
|
||||||
|
eaclTables: make(map[string]*eacl.Table),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,6 +58,14 @@ func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) SetObjectPutError(fileName string, err error) {
|
||||||
|
if err == nil {
|
||||||
|
delete(t.objectPutErrors, fileName)
|
||||||
|
} else {
|
||||||
|
t.objectPutErrors[fileName] = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) Objects() []*object.Object {
|
func (t *TestFrostFS) Objects() []*object.Object {
|
||||||
res := make([]*object.Object, 0, len(t.objects))
|
res := make([]*object.Object, 0, len(t.objects))
|
||||||
|
|
||||||
|
@ -199,6 +209,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
|
||||||
|
|
||||||
attrs := make([]object.Attribute, 0)
|
attrs := make([]object.Attribute, 0)
|
||||||
|
|
||||||
|
if err := t.objectPutErrors[prm.Filepath]; err != nil {
|
||||||
|
return oid.ID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
if prm.Filepath != "" {
|
if prm.Filepath != "" {
|
||||||
a := object.NewAttribute()
|
a := object.NewAttribute()
|
||||||
a.SetKey(object.AttributeFilePath)
|
a.SetKey(object.AttributeFilePath)
|
||||||
|
|
|
@ -469,6 +469,10 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
|
||||||
})
|
})
|
||||||
id, err := n.frostFS.CreateObject(ctx, prm)
|
id, err := n.frostFS.CreateObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
|
||||||
|
n.reqLogger(ctx).Warn("failed to discard put payload, probably goroutine leaks", zap.Error(errDiscard))
|
||||||
|
}
|
||||||
|
|
||||||
return 0, oid.ID{}, nil, err
|
return 0, oid.ID{}, nil, err
|
||||||
}
|
}
|
||||||
return size, id, hash.Sum(nil), nil
|
return size, id, hash.Sum(nil), nil
|
||||||
|
|
Loading…
Reference in a new issue