diff --git a/README.md b/README.md index 37a8866..697dcf5 100644 --- a/README.md +++ b/README.md @@ -382,6 +382,19 @@ You can also add some attributes to your file using the following rules: HTTP_GW_UPLOAD_HEADER_USE_DEFAULT_TIMESTAMP option and if request doesn't provide `X-Attribute-Timestamp` header of its own +--- +**NOTE** + +There are some reserved headers type of `X-Attribute-NEOFS-*` (headers are arranged in descending order of priority): +1. `X-Attribute-Neofs-Expiration-Epoch: 100` +2. `X-Attribute-Neofs-Expiration-Duration: 24h30m` +3. `X-Attribute-Neofs-Expiration-Timestamp: 1637574797` +4. `X-Attribute-Neofs-Expiration-RFC3339: 2021-11-22T09:55:49Z` + +which transforms to `X-Attribute-Neofs-Expiration-Epoch`. So you can provide expiration any convenient way. + +--- + For successful uploads you get JSON data in reply body with container and object ID, like this: ``` diff --git a/go.mod b/go.mod index 7a9c769..4a4d121 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/klauspost/compress v1.13.1 // indirect github.com/nspcc-dev/neo-go v0.97.3 + github.com/nspcc-dev/neofs-api-go v1.30.0 github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211115110427-df6a622c20e8 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.29.0 diff --git a/uploader/filter.go b/uploader/filter.go index 0fb853f..9e4eca5 100644 --- a/uploader/filter.go +++ b/uploader/filter.go @@ -2,7 +2,11 @@ package uploader import ( "bytes" + "fmt" + "strconv" + "time" + "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -10,6 +14,10 @@ import ( const ( userAttributeHeaderPrefix = "X-Attribute-" systemAttributePrefix = "__NEOFS__" + + expirationDurationAttr = systemAttributePrefix + "EXPIRATION_DURATION" + expirationTimestampAttr = systemAttributePrefix + "EXPIRATION_TIMESTAMP" + expirationRFC3339Attr = systemAttributePrefix + "EXPIRATION_RFC3339" ) var neofsAttributeHeaderPrefixes = [...][]byte{[]byte("Neofs-"), []byte("NEOFS-"), []byte("neofs-")} @@ -68,3 +76,60 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) map[string]str return result } + +func prepareExpirationHeader(headers map[string]string, epochDurations *epochDurations) error { + expirationInEpoch := headers[object.SysAttributeExpEpoch] + + if timeRFC3339, ok := headers[expirationRFC3339Attr]; ok { + expTime, err := time.Parse(time.RFC3339, timeRFC3339) + if err != nil { + return fmt.Errorf("couldn't parse value %s of header %s", timeRFC3339, expirationRFC3339Attr) + } + + now := time.Now().UTC() + if expTime.Before(now) { + return fmt.Errorf("value %s of header %s must be in the future", timeRFC3339, expirationRFC3339Attr) + } + updateExpirationHeader(headers, epochDurations, expTime.Sub(now)) + delete(headers, expirationRFC3339Attr) + } + + if timestamp, ok := headers[expirationTimestampAttr]; ok { + value, err := strconv.ParseInt(timestamp, 10, 64) + if err != nil { + return fmt.Errorf("couldn't parse value %s of header %s", timestamp, expirationTimestampAttr) + } + expTime := time.Unix(value, 0) + + now := time.Now() + if expTime.Before(now) { + return fmt.Errorf("value %s of header %s must be in the future", timestamp, expirationTimestampAttr) + } + updateExpirationHeader(headers, epochDurations, expTime.Sub(now)) + delete(headers, expirationTimestampAttr) + } + + if duration, ok := headers[expirationDurationAttr]; ok { + expDuration, err := time.ParseDuration(duration) + if err != nil { + return fmt.Errorf("couldn't parse value %s of header %s", duration, expirationDurationAttr) + } + if expDuration <= 0 { + return fmt.Errorf("value %s of header %s must be positive", expDuration, expirationDurationAttr) + } + updateExpirationHeader(headers, epochDurations, expDuration) + delete(headers, expirationDurationAttr) + } + + if expirationInEpoch != "" { + headers[object.SysAttributeExpEpoch] = expirationInEpoch + } + + return nil +} + +func updateExpirationHeader(headers map[string]string, durations *epochDurations, expDuration time.Duration) { + epochDuration := durations.msPerBlock * int64(durations.blockPerEpoch) + numEpoch := expDuration.Milliseconds() / epochDuration + headers[object.SysAttributeExpEpoch] = strconv.FormatInt(int64(durations.currentEpoch)+numEpoch, 10) +} diff --git a/uploader/filter_test.go b/uploader/filter_test.go index 9cfb435..a59bcf5 100644 --- a/uploader/filter_test.go +++ b/uploader/filter_test.go @@ -1,7 +1,11 @@ package uploader import ( + "strconv" "testing" + "time" + + "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-sdk-go/logger" "github.com/stretchr/testify/require" @@ -30,3 +34,142 @@ func TestFilter(t *testing.T) { require.Equal(t, expected, result) } + +func TestPrepareExpirationHeader(t *testing.T) { + tomorrow := time.Now().Add(24 * time.Hour) + tomorrowUnix := tomorrow.Unix() + tomorrowUnixNano := tomorrow.UnixNano() + tomorrowUnixMilli := tomorrowUnixNano / 1e6 + + epoch := "100" + duration := "24h" + timestampSec := strconv.FormatInt(tomorrowUnix, 10) + timestampMilli := strconv.FormatInt(tomorrowUnixMilli, 10) + timestampNano := strconv.FormatInt(tomorrowUnixNano, 10) + + defaultDurations := &epochDurations{ + currentEpoch: 10, + msPerBlock: 1000, + blockPerEpoch: 101, + } + + epochPerDay := (24 * time.Hour).Milliseconds() / int64(defaultDurations.blockPerEpoch) / defaultDurations.msPerBlock + defaultExpEpoch := strconv.FormatInt(int64(defaultDurations.currentEpoch)+epochPerDay, 10) + + for _, tc := range []struct { + name string + headers map[string]string + durations *epochDurations + err bool + expected map[string]string + }{ + { + name: "valid epoch", + headers: map[string]string{object.SysAttributeExpEpoch: epoch}, + expected: map[string]string{object.SysAttributeExpEpoch: epoch}, + }, + { + name: "valid epoch, valid duration", + headers: map[string]string{ + object.SysAttributeExpEpoch: epoch, + expirationDurationAttr: duration, + }, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: epoch}, + }, + { + name: "valid epoch, valid rfc3339", + headers: map[string]string{ + object.SysAttributeExpEpoch: epoch, + expirationRFC3339Attr: tomorrow.Format(time.RFC3339), + }, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: epoch}, + }, + { + name: "valid epoch, valid timestamp sec", + headers: map[string]string{ + object.SysAttributeExpEpoch: epoch, + expirationTimestampAttr: timestampSec, + }, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: epoch}, + }, + { + name: "valid epoch, valid timestamp milli", + headers: map[string]string{ + object.SysAttributeExpEpoch: epoch, + expirationTimestampAttr: timestampMilli, + }, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: epoch}, + }, + { + name: "valid epoch, valid timestamp nano", + headers: map[string]string{ + object.SysAttributeExpEpoch: epoch, + expirationTimestampAttr: timestampNano, + }, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: epoch}, + }, + { + name: "valid timestamp sec", + headers: map[string]string{expirationTimestampAttr: timestampSec}, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: defaultExpEpoch}, + }, + { + name: "valid duration", + headers: map[string]string{expirationDurationAttr: duration}, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: defaultExpEpoch}, + }, + { + name: "valid rfc3339", + headers: map[string]string{expirationRFC3339Attr: tomorrow.Format(time.RFC3339)}, + durations: defaultDurations, + expected: map[string]string{object.SysAttributeExpEpoch: defaultExpEpoch}, + }, + { + name: "invalid timestamp sec", + headers: map[string]string{expirationTimestampAttr: "abc"}, + err: true, + }, + { + name: "invalid timestamp sec zero", + headers: map[string]string{expirationTimestampAttr: "0"}, + err: true, + }, + { + name: "invalid duration", + headers: map[string]string{expirationDurationAttr: "1d"}, + err: true, + }, + { + name: "invalid duration negative", + headers: map[string]string{expirationDurationAttr: "-5h"}, + err: true, + }, + { + name: "invalid rfc3339", + headers: map[string]string{expirationRFC3339Attr: "abc"}, + err: true, + }, + { + name: "invalid rfc3339 zero", + headers: map[string]string{expirationRFC3339Attr: time.RFC3339}, + err: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := prepareExpirationHeader(tc.headers, tc.durations) + if tc.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, tc.headers) + } + }) + } +} diff --git a/uploader/upload.go b/uploader/upload.go index 35ef30c..4b81d83 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -2,7 +2,9 @@ package uploader import ( "context" + "encoding/binary" "encoding/json" + "fmt" "io" "strconv" "time" @@ -11,6 +13,7 @@ import ( "github.com/nspcc-dev/neofs-http-gw/tokens" "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/pool" @@ -31,6 +34,12 @@ type Uploader struct { enableDefaultTimestamp bool } +type epochDurations struct { + currentEpoch uint64 + msPerBlock int64 + blockPerEpoch uint64 +} + // New creates a new Uploader using specified logger, connection pool and // other options. func New(log *zap.Logger, conns pool.Pool, enableDefaultTimestamp bool) *Uploader { @@ -80,6 +89,20 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { return } filtered := filterHeaders(u.log, &c.Request.Header) + if needParseExpiration(filtered) { + epochDuration, err := getEpochDurations(c, u.pool) + if err != nil { + log.Error("could not get epoch durations from network info", zap.Error(err)) + response.Error(c, "could parse expiration header, try expiration in epoch", fasthttp.StatusBadRequest) + return + } + if err = prepareExpirationHeader(filtered, epochDuration); err != nil { + log.Error("could not prepare expiration header", zap.Error(err)) + response.Error(c, "could parse expiration header, try expiration in epoch", fasthttp.StatusBadRequest) + return + } + } + attributes := make([]*object.Attribute, 0, len(filtered)) // prepares attributes from filtered headers for key, val := range filtered { @@ -168,3 +191,37 @@ func (pr *putResponse) encode(w io.Writer) error { enc.SetIndent("", "\t") return enc.Encode(pr) } + +func getEpochDurations(ctx context.Context, p pool.Pool) (*epochDurations, error) { + if conn, _, err := p.Connection(); err != nil { + return nil, err + } else if networkInfo, err := conn.NetworkInfo(ctx); err != nil { + return nil, err + } else { + res := &epochDurations{ + currentEpoch: networkInfo.CurrentEpoch(), + msPerBlock: networkInfo.MsPerBlock(), + } + + networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { + if string(parameter.Key()) == "EpochDuration" { + data := make([]byte, 8) + copy(data, parameter.Value()) + res.blockPerEpoch = binary.LittleEndian.Uint64(data) + return true + } + return false + }) + if res.blockPerEpoch == 0 { + return nil, fmt.Errorf("not found param: EpochDuration") + } + return res, nil + } +} + +func needParseExpiration(headers map[string]string) bool { + _, ok1 := headers[expirationDurationAttr] + _, ok2 := headers[expirationRFC3339Attr] + _, ok3 := headers[expirationTimestampAttr] + return ok1 || ok2 || ok3 +}