[#221] add error response on key duplicates
Signed-off-by: Artem Tataurov <a.tataurov@yadro.com>
This commit is contained in:
parent
8f1d84ba8d
commit
278376643a
5 changed files with 81 additions and 9 deletions
|
@ -26,6 +26,7 @@ If you configure application using `.yaml` file change:
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Support the `Date` header on upload (#214)
|
- Support the `Date` header on upload (#214)
|
||||||
|
- Add error response on attribute duplicates (#221)
|
||||||
- Timeout for individual operations in streaming RPC (#234)
|
- Timeout for individual operations in streaming RPC (#234)
|
||||||
|
|
||||||
## [0.25.0] - 2022-10-31
|
## [0.25.0] - 2022-10-31
|
||||||
|
|
|
@ -67,6 +67,7 @@ func TestIntegration(t *testing.T) {
|
||||||
require.NoError(t, err, version)
|
require.NoError(t, err, version)
|
||||||
|
|
||||||
t.Run("simple put "+version, func(t *testing.T) { simplePut(ctx, t, clientPool, CID, version) })
|
t.Run("simple put "+version, func(t *testing.T) { simplePut(ctx, t, clientPool, CID, version) })
|
||||||
|
t.Run("put with duplicate keys "+version, func(t *testing.T) { putWithDuplicateKeys(t, CID) })
|
||||||
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
|
@ -171,6 +172,43 @@ func makePutRequestAndCheck(ctx context.Context, t *testing.T, p *pool.Pool, cnr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func putWithDuplicateKeys(t *testing.T, CID cid.ID) {
|
||||||
|
url := testHost + "/upload/" + CID.String()
|
||||||
|
|
||||||
|
attr := "X-Attribute-User-Attribute"
|
||||||
|
content := "content of file"
|
||||||
|
valOne, valTwo := "first_value", "second_value"
|
||||||
|
fileName := "newFile.txt"
|
||||||
|
|
||||||
|
var buff bytes.Buffer
|
||||||
|
w := multipart.NewWriter(&buff)
|
||||||
|
fw, err := w.CreateFormFile("file", fileName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = io.Copy(fw, bytes.NewBufferString(content))
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = w.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
request, err := http.NewRequest(http.MethodPost, url, &buff)
|
||||||
|
require.NoError(t, err)
|
||||||
|
request.Header.Set("Content-Type", w.FormDataContentType())
|
||||||
|
request.Header.Add(attr, valOne)
|
||||||
|
request.Header.Add(attr, valTwo)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(request)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err := resp.Body.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "key duplication error: "+attr+"\n", string(body))
|
||||||
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
func simpleGet(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
func simpleGet(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||||
content := "content of file"
|
content := "content of file"
|
||||||
attributes := map[string]string{
|
attributes := map[string]string{
|
||||||
|
|
|
@ -26,7 +26,8 @@ func systemTranslator(key, prefix []byte) []byte {
|
||||||
return bytes.ToUpper(key)
|
return bytes.ToUpper(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) map[string]string {
|
func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]string, error) {
|
||||||
|
var err error
|
||||||
result := make(map[string]string)
|
result := make(map[string]string)
|
||||||
prefix := []byte(utils.UserAttributeHeaderPrefix)
|
prefix := []byte(utils.UserAttributeHeaderPrefix)
|
||||||
|
|
||||||
|
@ -42,23 +43,30 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) map[string]str
|
||||||
}
|
}
|
||||||
|
|
||||||
// removing attribute prefix
|
// removing attribute prefix
|
||||||
key = bytes.TrimPrefix(key, prefix)
|
clearKey := bytes.TrimPrefix(key, prefix)
|
||||||
|
|
||||||
// checks that it's a system NeoFS header
|
// checks that it's a system NeoFS header
|
||||||
for _, system := range neofsAttributeHeaderPrefixes {
|
for _, system := range neofsAttributeHeaderPrefixes {
|
||||||
if bytes.HasPrefix(key, system) {
|
if bytes.HasPrefix(clearKey, system) {
|
||||||
key = systemTranslator(key, system)
|
clearKey = systemTranslator(clearKey, system)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// checks that the attribute key is not empty
|
// checks that the attribute key is not empty
|
||||||
if len(key) == 0 {
|
if len(clearKey) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if key gets duplicated
|
||||||
|
// return error containing full key name (with prefix)
|
||||||
|
if _, ok := result[string(clearKey)]; ok {
|
||||||
|
err = fmt.Errorf("key duplication error: %s", string(key))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// make string representation of key / val
|
// make string representation of key / val
|
||||||
k, v := string(key), string(val)
|
k, v := string(clearKey), string(val)
|
||||||
|
|
||||||
result[k] = v
|
result[k] = v
|
||||||
|
|
||||||
|
@ -67,7 +75,7 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) map[string]str
|
||||||
zap.String("val", v))
|
zap.String("val", v))
|
||||||
})
|
})
|
||||||
|
|
||||||
return result
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareExpirationHeader(headers map[string]string, epochDurations *epochDurations, now time.Time) error {
|
func prepareExpirationHeader(headers map[string]string, epochDurations *epochDurations, now time.Time) error {
|
||||||
|
|
|
@ -16,8 +16,27 @@ import (
|
||||||
func TestFilter(t *testing.T) {
|
func TestFilter(t *testing.T) {
|
||||||
log := zap.NewNop()
|
log := zap.NewNop()
|
||||||
|
|
||||||
|
t.Run("duplicate keys error", func(t *testing.T) {
|
||||||
req := &fasthttp.RequestHeader{}
|
req := &fasthttp.RequestHeader{}
|
||||||
req.DisableNormalizing()
|
req.DisableNormalizing()
|
||||||
|
req.Add("X-Attribute-DupKey", "first-value")
|
||||||
|
req.Add("X-Attribute-DupKey", "second-value")
|
||||||
|
_, err := filterHeaders(log, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("duplicate system keys error", func(t *testing.T) {
|
||||||
|
req := &fasthttp.RequestHeader{}
|
||||||
|
req.DisableNormalizing()
|
||||||
|
req.Add("X-Attribute-Neofs-DupKey", "first-value")
|
||||||
|
req.Add("X-Attribute-Neofs-DupKey", "second-value")
|
||||||
|
_, err := filterHeaders(log, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
req := &fasthttp.RequestHeader{}
|
||||||
|
req.DisableNormalizing()
|
||||||
|
|
||||||
req.Set("X-Attribute-Neofs-Expiration-Epoch1", "101")
|
req.Set("X-Attribute-Neofs-Expiration-Epoch1", "101")
|
||||||
req.Set("X-Attribute-NEOFS-Expiration-Epoch2", "102")
|
req.Set("X-Attribute-NEOFS-Expiration-Epoch2", "102")
|
||||||
req.Set("X-Attribute-neofs-Expiration-Epoch3", "103")
|
req.Set("X-Attribute-neofs-Expiration-Epoch3", "103")
|
||||||
|
@ -30,7 +49,8 @@ func TestFilter(t *testing.T) {
|
||||||
"__NEOFS__EXPIRATION_EPOCH2": "102",
|
"__NEOFS__EXPIRATION_EPOCH2": "102",
|
||||||
}
|
}
|
||||||
|
|
||||||
result := filterHeaders(log, req)
|
result, err := filterHeaders(log, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expected, result)
|
require.Equal(t, expected, result)
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,7 +114,12 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filtered := filterHeaders(u.log, &c.Request.Header)
|
filtered, err := filterHeaders(u.log, &c.Request.Header)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not process headers", zap.Error(err))
|
||||||
|
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
if needParseExpiration(filtered) {
|
if needParseExpiration(filtered) {
|
||||||
epochDuration, err := getEpochDurations(c, u.pool)
|
epochDuration, err := getEpochDurations(c, u.pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue