forked from TrueCloudLab/frostfs-s3-gw
[#195] Set tick attribute to lock objects
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
4c3c4b6bee
commit
4a67e4b311
8 changed files with 122 additions and 43 deletions
|
@ -7,12 +7,11 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -282,8 +281,6 @@ func (h *handler) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Reque
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//objectv2.ReadLock()
|
|
||||||
|
|
||||||
if err = checkLockInfo(lockInfo, r.Header); err != nil {
|
if err = checkLockInfo(lockInfo, r.Header); err != nil {
|
||||||
h.logAndSendError(w, "couldn't change lock mode", reqInfo, err)
|
h.logAndSendError(w, "couldn't change lock mode", reqInfo, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -290,7 +290,7 @@ func (n *layer) Initialize(ctx context.Context, c Notificator) error {
|
||||||
return fmt.Errorf("already initialized")
|
return fmt.Errorf("already initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.Subscribe(ctx, "lock", MsgHandlerFunc(n.handleLockTick)); err != nil {
|
if err := c.Subscribe(ctx, LockTopic, MsgHandlerFunc(n.handleLockTick)); err != nil {
|
||||||
return fmt.Errorf("couldn't initialize layer: %w", err)
|
return fmt.Errorf("couldn't initialize layer: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
47
api/layer/locking_test.go
Normal file
47
api/layer/locking_test.go
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
package layer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestObjectLockAttributes(t *testing.T) {
|
||||||
|
tc := prepareContext(t)
|
||||||
|
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||||
|
BktInfo: tc.bktInfo,
|
||||||
|
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
obj := tc.putObject([]byte("content obj1 v1"))
|
||||||
|
|
||||||
|
_, err = tc.layer.PutSystemObject(tc.ctx, &PutSystemObjectParams{
|
||||||
|
BktInfo: tc.bktInfo,
|
||||||
|
ObjName: obj.RetentionObject(),
|
||||||
|
Metadata: make(map[string]string),
|
||||||
|
Lock: &data.ObjectLock{
|
||||||
|
Until: time.Now(),
|
||||||
|
Objects: []oid.ID{*obj.ID},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
lockObj := tc.getSystemObject(obj.RetentionObject())
|
||||||
|
require.NotNil(t, lockObj)
|
||||||
|
|
||||||
|
tickTopic, tickEpoch := false, false
|
||||||
|
for _, attr := range lockObj.Attributes() {
|
||||||
|
if attr.Key() == AttributeSysTickEpoch {
|
||||||
|
tickEpoch = true
|
||||||
|
} else if attr.Key() == AttributeSysTickTopic {
|
||||||
|
tickTopic = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Truef(t, tickTopic, "system header __NEOFS__TICK_TOPIC presence")
|
||||||
|
require.Truef(t, tickEpoch, "system header __NEOFS__TICK_EPOCH presence")
|
||||||
|
}
|
|
@ -223,4 +223,12 @@ type NeoFS interface {
|
||||||
//
|
//
|
||||||
// Returns any error encountered which prevented the removal request to be sent.
|
// Returns any error encountered which prevented the removal request to be sent.
|
||||||
DeleteObject(context.Context, PrmObjectDelete) error
|
DeleteObject(context.Context, PrmObjectDelete) error
|
||||||
|
|
||||||
|
// TimeToEpoch compute current epoch and epoch that corresponds provided time.
|
||||||
|
// Note:
|
||||||
|
// * time must be in the future
|
||||||
|
// * time will be ceil rounded to match epoch
|
||||||
|
//
|
||||||
|
// Returns any error encountered which prevented computing epochs.
|
||||||
|
TimeToEpoch(context.Context, time.Time) (uint64, uint64, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@ import (
|
||||||
const (
|
const (
|
||||||
AttributeComplianceMode = ".s3-compliance-mode"
|
AttributeComplianceMode = ".s3-compliance-mode"
|
||||||
AttributeRetainUntil = ".s3-retain-until"
|
AttributeRetainUntil = ".s3-retain-until"
|
||||||
|
AttributeSysTickEpoch = "__NEOFS__TICK_EPOCH"
|
||||||
|
AttributeSysTickTopic = "__NEOFS__TICK_TOPIC"
|
||||||
|
LockTopic = "lock"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *layer) PutSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
func (n *layer) PutSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
||||||
|
@ -102,7 +105,13 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject
|
||||||
|
|
||||||
if p.Lock != nil && len(p.Lock.Objects) > 0 {
|
if p.Lock != nil && len(p.Lock.Objects) > 0 {
|
||||||
prm.Locks = p.Lock.Objects
|
prm.Locks = p.Lock.Objects
|
||||||
prm.Attributes = append(prm.Attributes, attributesFromLock(p.Lock)...)
|
|
||||||
|
attrs, err := n.attributesFromLock(ctx, p.Lock)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
prm.Attributes = append(prm.Attributes, attrs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
|
@ -278,22 +287,28 @@ func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func attributesFromLock(lock *data.ObjectLock) [][2]string {
|
func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ([][2]string, error) {
|
||||||
var result [][2]string
|
var result [][2]string
|
||||||
if !lock.Until.IsZero() {
|
if !lock.Until.IsZero() {
|
||||||
attrRetainUntil := [2]string{
|
_, exp, err := n.neoFS.TimeToEpoch(ctx, lock.Until)
|
||||||
AttributeRetainUntil,
|
if err != nil {
|
||||||
lock.Until.Format(time.RFC3339),
|
return nil, err
|
||||||
}
|
}
|
||||||
result = append(result, attrRetainUntil)
|
|
||||||
|
attrs := [][2]string{
|
||||||
|
{AttributeSysTickEpoch, strconv.FormatUint(exp, 10)},
|
||||||
|
{AttributeSysTickTopic, LockTopic},
|
||||||
|
{AttributeRetainUntil, lock.Until.Format(time.RFC3339)},
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, attrs...)
|
||||||
if lock.IsCompliance {
|
if lock.IsCompliance {
|
||||||
attrCompliance := [2]string{
|
attrCompliance := [2]string{
|
||||||
AttributeComplianceMode,
|
AttributeComplianceMode, strconv.FormatBool(true),
|
||||||
strconv.FormatBool(true),
|
|
||||||
}
|
}
|
||||||
result = append(result, attrCompliance)
|
result = append(result, attrCompliance)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -68,12 +67,13 @@ type NeoFS interface {
|
||||||
// prevented the container to be created.
|
// prevented the container to be created.
|
||||||
CreateContainer(context.Context, PrmContainerCreate) (*cid.ID, error)
|
CreateContainer(context.Context, PrmContainerCreate) (*cid.ID, error)
|
||||||
|
|
||||||
// NetworkState returns current state of the NeoFS network.
|
// TimeToEpoch compute current epoch and epoch that corresponds provided time.
|
||||||
// Returns any error encountered which prevented state to be read.
|
// Note:
|
||||||
|
// * time must be in the future
|
||||||
|
// * time will be ceil rounded to match epoch
|
||||||
//
|
//
|
||||||
// Returns exactly one non-nil value. Returns any error encountered which
|
// Returns any error encountered which prevented computing epochs.
|
||||||
// prevented the state to be read.
|
TimeToEpoch(context.Context, time.Time) (uint64, uint64, error)
|
||||||
NetworkState(context.Context) (*NetworkState, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Agent contains client communicating with NeoFS and logger.
|
// Agent contains client communicating with NeoFS and logger.
|
||||||
|
@ -213,22 +213,10 @@ func (a *Agent) IssueSecret(ctx context.Context, w io.Writer, options *IssueSecr
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
netState, err := a.neoFS.NetworkState(ctx)
|
lifetime.Iat, lifetime.Exp, err = a.neoFS.TimeToEpoch(ctx, time.Now().Add(options.Lifetime))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
lifetime.Iat = netState.Epoch
|
|
||||||
msPerEpoch := netState.EpochDuration * uint64(netState.BlockDuration)
|
|
||||||
epochLifetime := uint64(options.Lifetime.Milliseconds()) / msPerEpoch
|
|
||||||
if uint64(options.Lifetime.Milliseconds())%msPerEpoch != 0 {
|
|
||||||
epochLifetime++
|
|
||||||
}
|
|
||||||
|
|
||||||
if epochLifetime >= math.MaxUint64-lifetime.Iat {
|
|
||||||
lifetime.Exp = math.MaxUint64
|
|
||||||
} else {
|
|
||||||
lifetime.Exp = lifetime.Iat + epochLifetime
|
|
||||||
}
|
|
||||||
|
|
||||||
gatesData, err := createTokens(options, lifetime)
|
gatesData, err := createTokens(options, lifetime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -44,16 +45,23 @@ func (x *NeoFS) SetConnectionPool(p *pool.Pool) {
|
||||||
x.pool = p
|
x.pool = p
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetworkState implements authmate.NeoFS interface method.
|
// TimeToEpoch implements authmate.NeoFS interface method.
|
||||||
func (x *NeoFS) NetworkState(ctx context.Context) (*authmate.NetworkState, error) {
|
func (x *NeoFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64, uint64, error) {
|
||||||
|
now := time.Now()
|
||||||
|
dur := futureTime.Sub(now)
|
||||||
|
if dur < 0 {
|
||||||
|
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
|
||||||
|
futureTime.Format(time.RFC3339), now.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
|
||||||
conn, _, err := x.pool.Connection()
|
conn, _, err := x.pool.Connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get connection from pool: %w", err)
|
return 0, 0, fmt.Errorf("get connection from pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{})
|
res, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get network info via client: %w", err)
|
return 0, 0, fmt.Errorf("get network info via client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
networkInfo := res.Info()
|
networkInfo := res.Info()
|
||||||
|
@ -74,14 +82,25 @@ func (x *NeoFS) NetworkState(ctx context.Context) (*authmate.NetworkState, error
|
||||||
})
|
})
|
||||||
|
|
||||||
if durEpoch == 0 {
|
if durEpoch == 0 {
|
||||||
return nil, errors.New("epoch duration is missing or zero")
|
return 0, 0, errors.New("epoch duration is missing or zero")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &authmate.NetworkState{
|
curr := networkInfo.CurrentEpoch()
|
||||||
Epoch: networkInfo.CurrentEpoch(),
|
msPerEpoch := durEpoch * uint64(networkInfo.MsPerBlock())
|
||||||
BlockDuration: networkInfo.MsPerBlock(),
|
|
||||||
EpochDuration: durEpoch,
|
epochLifetime := uint64(dur.Milliseconds()) / msPerEpoch
|
||||||
}, nil
|
if uint64(dur.Milliseconds())%msPerEpoch != 0 {
|
||||||
|
epochLifetime++
|
||||||
|
}
|
||||||
|
|
||||||
|
var epoch uint64
|
||||||
|
if epochLifetime >= math.MaxUint64-curr {
|
||||||
|
epoch = math.MaxUint64
|
||||||
|
} else {
|
||||||
|
epoch = curr + epochLifetime
|
||||||
|
}
|
||||||
|
|
||||||
|
return curr, epoch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Container reads container by ID using connection pool. Returns exact one non-nil value.
|
// Container reads container by ID using connection pool. Returns exact one non-nil value.
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs"
|
"github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs"
|
||||||
|
@ -240,6 +241,10 @@ func (t *TestNeoFS) DeleteObject(_ context.Context, prm neofs.PrmObjectDelete) e
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestNeoFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64, uint64, error) {
|
||||||
|
return t.currentEpoch, t.currentEpoch + uint64(futureTime.Second()), nil
|
||||||
|
}
|
||||||
|
|
||||||
func isMatched(attributes []*object.Attribute, filter object.SearchFilter) bool {
|
func isMatched(attributes []*object.Attribute, filter object.SearchFilter) bool {
|
||||||
for _, attr := range attributes {
|
for _, attr := range attributes {
|
||||||
if attr.Key() == filter.Header() && attr.Value() == filter.Value() {
|
if attr.Key() == filter.Header() && attr.Value() == filter.Value() {
|
||||||
|
|
Loading…
Reference in a new issue