[#22] Update system attributes prefix

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-03-16 11:40:08 +03:00
parent 1f66149316
commit a8ec09e76a
14 changed files with 562 additions and 448 deletions

View file

@ -3,29 +3,12 @@ package uploader
import (
"bytes"
"fmt"
"math"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
var frostfsAttributeHeaderPrefixes = [...][]byte{[]byte("Neofs-"), []byte("NEOFS-"), []byte("neofs-")}
func systemTranslator(key, prefix []byte) []byte {
// replace the specified prefix with `__NEOFS__`
key = bytes.Replace(key, prefix, []byte(utils.SystemAttributePrefix), 1)
// replace `-` with `_`
key = bytes.ReplaceAll(key, []byte("-"), []byte("_"))
// replace with uppercase
return bytes.ToUpper(key)
}
func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]string, error) {
var err error
result := make(map[string]string)
@ -45,13 +28,7 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]st
// removing attribute prefix
clearKey := bytes.TrimPrefix(key, prefix)
// checks that it's a system NeoFS header
for _, system := range frostfsAttributeHeaderPrefixes {
if bytes.HasPrefix(clearKey, system) {
clearKey = systemTranslator(clearKey, system)
break
}
}
clearKey = utils.TransformIfSystem(clearKey)
// checks that the attribute key is not empty
if len(clearKey) == 0 {
@ -77,69 +54,3 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]st
return result, err
}
func prepareExpirationHeader(headers map[string]string, epochDurations *epochDurations, now time.Time) error {
expirationInEpoch := headers[object.SysAttributeExpEpoch]
if timeRFC3339, ok := headers[utils.ExpirationRFC3339Attr]; ok {
expTime, err := time.Parse(time.RFC3339, timeRFC3339)
if err != nil {
return fmt.Errorf("couldn't parse value %s of header %s", timeRFC3339, utils.ExpirationRFC3339Attr)
}
if expTime.Before(now) {
return fmt.Errorf("value %s of header %s must be in the future", timeRFC3339, utils.ExpirationRFC3339Attr)
}
updateExpirationHeader(headers, epochDurations, expTime.Sub(now))
delete(headers, utils.ExpirationRFC3339Attr)
}
if timestamp, ok := headers[utils.ExpirationTimestampAttr]; ok {
value, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return fmt.Errorf("couldn't parse value %s of header %s", timestamp, utils.ExpirationTimestampAttr)
}
expTime := time.Unix(value, 0)
if expTime.Before(now) {
return fmt.Errorf("value %s of header %s must be in the future", timestamp, utils.ExpirationTimestampAttr)
}
updateExpirationHeader(headers, epochDurations, expTime.Sub(now))
delete(headers, utils.ExpirationTimestampAttr)
}
if duration, ok := headers[utils.ExpirationDurationAttr]; ok {
expDuration, err := time.ParseDuration(duration)
if err != nil {
return fmt.Errorf("couldn't parse value %s of header %s", duration, utils.ExpirationDurationAttr)
}
if expDuration <= 0 {
return fmt.Errorf("value %s of header %s must be positive", expDuration, utils.ExpirationDurationAttr)
}
updateExpirationHeader(headers, epochDurations, expDuration)
delete(headers, utils.ExpirationDurationAttr)
}
if expirationInEpoch != "" {
headers[object.SysAttributeExpEpoch] = expirationInEpoch
}
return nil
}
func updateExpirationHeader(headers map[string]string, durations *epochDurations, expDuration time.Duration) {
epochDuration := uint64(durations.msPerBlock) * durations.blockPerEpoch
currentEpoch := durations.currentEpoch
numEpoch := uint64(expDuration.Milliseconds()) / epochDuration
if uint64(expDuration.Milliseconds())%epochDuration != 0 {
numEpoch++
}
expirationEpoch := uint64(math.MaxUint64)
if numEpoch < math.MaxUint64-currentEpoch {
expirationEpoch = currentEpoch + numEpoch
}
headers[object.SysAttributeExpEpoch] = strconv.FormatUint(expirationEpoch, 10)
}

View file

@ -1,13 +1,8 @@
package uploader
import (
"math"
"strconv"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"github.com/stretchr/testify/require"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
@ -28,8 +23,8 @@ func TestFilter(t *testing.T) {
t.Run("duplicate system keys error", func(t *testing.T) {
req := &fasthttp.RequestHeader{}
req.DisableNormalizing()
req.Add("X-Attribute-Neofs-DupKey", "first-value")
req.Add("X-Attribute-Neofs-DupKey", "second-value")
req.Add("X-Attribute-System-DupKey", "first-value")
req.Add("X-Attribute-System-DupKey", "second-value")
_, err := filterHeaders(log, req)
require.Error(t, err)
})
@ -37,16 +32,16 @@ func TestFilter(t *testing.T) {
req := &fasthttp.RequestHeader{}
req.DisableNormalizing()
req.Set("X-Attribute-Neofs-Expiration-Epoch1", "101")
req.Set("X-Attribute-NEOFS-Expiration-Epoch2", "102")
req.Set("X-Attribute-neofs-Expiration-Epoch3", "103")
req.Set("X-Attribute-System-Expiration-Epoch1", "101")
req.Set("X-Attribute-SYSTEM-Expiration-Epoch2", "102")
req.Set("X-Attribute-system-Expiration-Epoch3", "103")
req.Set("X-Attribute-MyAttribute", "value")
expected := map[string]string{
"__NEOFS__EXPIRATION_EPOCH1": "101",
"MyAttribute": "value",
"__NEOFS__EXPIRATION_EPOCH3": "103",
"__NEOFS__EXPIRATION_EPOCH2": "102",
"__SYSTEM__EXPIRATION_EPOCH1": "101",
"MyAttribute": "value",
"__SYSTEM__EXPIRATION_EPOCH3": "103",
"__SYSTEM__EXPIRATION_EPOCH2": "102",
}
result, err := filterHeaders(log, req)
@ -54,157 +49,3 @@ func TestFilter(t *testing.T) {
require.Equal(t, expected, result)
}
func TestPrepareExpirationHeader(t *testing.T) {
tomorrow := time.Now().Add(24 * time.Hour)
tomorrowUnix := tomorrow.Unix()
tomorrowUnixNano := tomorrow.UnixNano()
tomorrowUnixMilli := tomorrowUnixNano / 1e6
epoch := "100"
duration := "24h"
timestampSec := strconv.FormatInt(tomorrowUnix, 10)
timestampMilli := strconv.FormatInt(tomorrowUnixMilli, 10)
timestampNano := strconv.FormatInt(tomorrowUnixNano, 10)
defaultDurations := &epochDurations{
currentEpoch: 10,
msPerBlock: 1000,
blockPerEpoch: 101,
}
msPerBlock := defaultDurations.blockPerEpoch * uint64(defaultDurations.msPerBlock)
epochPerDay := uint64((24 * time.Hour).Milliseconds()) / msPerBlock
if uint64((24*time.Hour).Milliseconds())%msPerBlock != 0 {
epochPerDay++
}
defaultExpEpoch := strconv.FormatUint(defaultDurations.currentEpoch+epochPerDay, 10)
for _, tc := range []struct {
name string
headers map[string]string
durations *epochDurations
err bool
expected map[string]string
}{
{
name: "valid epoch",
headers: map[string]string{object.SysAttributeExpEpoch: epoch},
expected: map[string]string{object.SysAttributeExpEpoch: epoch},
},
{
name: "valid epoch, valid duration",
headers: map[string]string{
object.SysAttributeExpEpoch: epoch,
utils.ExpirationDurationAttr: duration,
},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: epoch},
},
{
name: "valid epoch, valid rfc3339",
headers: map[string]string{
object.SysAttributeExpEpoch: epoch,
utils.ExpirationRFC3339Attr: tomorrow.Format(time.RFC3339),
},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: epoch},
},
{
name: "valid epoch, valid timestamp sec",
headers: map[string]string{
object.SysAttributeExpEpoch: epoch,
utils.ExpirationTimestampAttr: timestampSec,
},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: epoch},
},
{
name: "valid epoch, valid timestamp milli",
headers: map[string]string{
object.SysAttributeExpEpoch: epoch,
utils.ExpirationTimestampAttr: timestampMilli,
},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: epoch},
},
{
name: "valid epoch, valid timestamp nano",
headers: map[string]string{
object.SysAttributeExpEpoch: epoch,
utils.ExpirationTimestampAttr: timestampNano,
},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: epoch},
},
{
name: "valid timestamp sec",
headers: map[string]string{utils.ExpirationTimestampAttr: timestampSec},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: defaultExpEpoch},
},
{
name: "valid duration",
headers: map[string]string{utils.ExpirationDurationAttr: duration},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: defaultExpEpoch},
},
{
name: "valid rfc3339",
headers: map[string]string{utils.ExpirationRFC3339Attr: tomorrow.Format(time.RFC3339)},
durations: defaultDurations,
expected: map[string]string{object.SysAttributeExpEpoch: defaultExpEpoch},
},
{
name: "valid max uint 64",
headers: map[string]string{utils.ExpirationRFC3339Attr: tomorrow.Format(time.RFC3339)},
durations: &epochDurations{
currentEpoch: math.MaxUint64 - 1,
msPerBlock: defaultDurations.msPerBlock,
blockPerEpoch: defaultDurations.blockPerEpoch,
},
expected: map[string]string{object.SysAttributeExpEpoch: strconv.FormatUint(uint64(math.MaxUint64), 10)},
},
{
name: "invalid timestamp sec",
headers: map[string]string{utils.ExpirationTimestampAttr: "abc"},
err: true,
},
{
name: "invalid timestamp sec zero",
headers: map[string]string{utils.ExpirationTimestampAttr: "0"},
err: true,
},
{
name: "invalid duration",
headers: map[string]string{utils.ExpirationDurationAttr: "1d"},
err: true,
},
{
name: "invalid duration negative",
headers: map[string]string{utils.ExpirationDurationAttr: "-5h"},
err: true,
},
{
name: "invalid rfc3339",
headers: map[string]string{utils.ExpirationRFC3339Attr: "abc"},
err: true,
},
{
name: "invalid rfc3339 zero",
headers: map[string]string{utils.ExpirationRFC3339Attr: time.RFC3339},
err: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
err := prepareExpirationHeader(tc.headers, tc.durations, time.Now())
if tc.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expected, tc.headers)
}
})
}
}

View file

@ -3,7 +3,6 @@ package uploader
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
@ -38,12 +37,6 @@ type Uploader struct {
containerResolver *resolver.ContainerResolver
}
type epochDurations struct {
currentEpoch uint64
msPerBlock int64
blockPerEpoch uint64
}
// Settings stores reloading parameters, so it has to provide atomic getters and setters.
type Settings struct {
defaultTimestamp atomic.Bool
@ -120,28 +113,20 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
return
}
if needParseExpiration(filtered) {
epochDuration, err := getEpochDurations(c, u.pool)
if err != nil {
log.Error("could not get epoch durations from network info", zap.Error(err))
response.Error(c, "could not get epoch durations from network info: "+err.Error(), fasthttp.StatusBadRequest)
return
}
now := time.Now()
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
log.Warn("could not parse client time", zap.String("Date header", string(rawHeader)), zap.Error(err))
} else {
now = parsed
}
now := time.Now()
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
log.Warn("could not parse client time", zap.String("Date header", string(rawHeader)), zap.Error(err))
} else {
now = parsed
}
}
if err = prepareExpirationHeader(filtered, epochDuration, now); err != nil {
log.Error("could not parse expiration header", zap.Error(err))
response.Error(c, "could not parse expiration header: "+err.Error(), fasthttp.StatusBadRequest)
return
}
if err = utils.PrepareExpirationHeader(c, u.pool, filtered, now); err != nil {
log.Error("could not prepare expiration header", zap.Error(err))
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
return
}
attributes := make([]object.Attribute, 0, len(filtered))
@ -246,28 +231,3 @@ func (pr *putResponse) encode(w io.Writer) error {
enc.SetIndent("", "\t")
return enc.Encode(pr)
}
func getEpochDurations(ctx context.Context, p *pool.Pool) (*epochDurations, error) {
networkInfo, err := p.NetworkInfo(ctx)
if err != nil {
return nil, err
}
res := &epochDurations{
currentEpoch: networkInfo.CurrentEpoch(),
msPerBlock: networkInfo.MsPerBlock(),
blockPerEpoch: networkInfo.EpochDuration(),
}
if res.blockPerEpoch == 0 {
return nil, fmt.Errorf("EpochDuration is empty")
}
return res, nil
}
func needParseExpiration(headers map[string]string) bool {
_, ok1 := headers[utils.ExpirationDurationAttr]
_, ok2 := headers[utils.ExpirationRFC3339Attr]
_, ok3 := headers[utils.ExpirationTimestampAttr]
return ok1 || ok2 || ok3
}