forked from TrueCloudLab/frostfs-s3-gw
[#192] Add handling expiration lifecycle
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
5bfb8fd291
commit
749b095acd
12 changed files with 650 additions and 55 deletions
|
@ -57,9 +57,15 @@ type (
|
|||
|
||||
// BucketSettings stores settings such as versioning.
|
||||
BucketSettings struct {
|
||||
Versioning string `json:"versioning"`
|
||||
LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"`
|
||||
LifecycleConfiguration *LifecycleConfiguration `json:"lifecycle_configuration"`
|
||||
Versioning string `json:"versioning"`
|
||||
LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"`
|
||||
LifecycleConfig *LifecycleConfig `json:"lifecycle_configuration"`
|
||||
}
|
||||
|
||||
// LifecycleConfig stores lifecycle config old and current settings.
|
||||
LifecycleConfig struct {
|
||||
OldConfigurationID string `json:"old_id"`
|
||||
CurrentConfiguration *LifecycleConfiguration
|
||||
}
|
||||
|
||||
// CORSConfiguration stores CORS configuration of a request.
|
||||
|
@ -125,3 +131,6 @@ func (b BucketSettings) VersioningEnabled() bool {
|
|||
func (b BucketSettings) VersioningSuspended() bool {
|
||||
return b.Versioning == VersioningSuspended
|
||||
}
|
||||
|
||||
// ExpirationObject returns name of system object for expiration tick object.
|
||||
func (o *ObjectInfo) ExpirationObject() string { return ".expiration." + o.Name }
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package data
|
||||
|
||||
import "encoding/xml"
|
||||
import (
|
||||
"encoding/xml"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type (
|
||||
LifecycleConfiguration struct {
|
||||
|
@ -15,7 +18,7 @@ type (
|
|||
ID string `xml:"ID" json:"ID"`
|
||||
NoncurrentVersionExpiration *NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration" json:"NoncurrentVersionExpiration"`
|
||||
NoncurrentVersionTransitions []NoncurrentVersionTransition `xml:"NoncurrentVersionTransition" json:"NoncurrentVersionTransition"`
|
||||
Prefix string `xml:"Prefix" json:"Prefix"`
|
||||
Prefix *string `xml:"Prefix" json:"Prefix"`
|
||||
Status string `xml:"Status" json:"Status"`
|
||||
Transitions []Transition `xml:"Transition" json:"Transition"`
|
||||
}
|
||||
|
@ -25,24 +28,24 @@ type (
|
|||
}
|
||||
|
||||
Expiration struct {
|
||||
Date string `xml:"Date" json:"Date"`
|
||||
Days int64 `xml:"Days" json:"Days"`
|
||||
ExpiredObjectDeleteMarker bool `xml:"ExpiredObjectDeleteMarker" json:"ExpiredObjectDeleteMarker"`
|
||||
Date *string `xml:"Date" json:"Date"`
|
||||
Days *int64 `xml:"Days" json:"Days"`
|
||||
ExpiredObjectDeleteMarker bool `xml:"ExpiredObjectDeleteMarker" json:"ExpiredObjectDeleteMarker"`
|
||||
}
|
||||
|
||||
LifecycleRuleFilter struct {
|
||||
And *LifecycleRuleAndOperator `xml:"And" json:"And"`
|
||||
ObjectSizeGreaterThan int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"`
|
||||
ObjectSizeLessThan int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"`
|
||||
Prefix string `xml:"Prefix" json:"Prefix"`
|
||||
ObjectSizeGreaterThan *int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"`
|
||||
ObjectSizeLessThan *int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"`
|
||||
Prefix *string `xml:"Prefix" json:"Prefix"`
|
||||
Tag *Tag `xml:"Tag" json:"Tag"`
|
||||
}
|
||||
|
||||
LifecycleRuleAndOperator struct {
|
||||
ObjectSizeGreaterThan int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"`
|
||||
ObjectSizeLessThan int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"`
|
||||
Prefix string `xml:"Prefix" json:"Prefix"`
|
||||
Tags []Tag `xml:"Tags" json:"Tags"`
|
||||
ObjectSizeGreaterThan *int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"`
|
||||
ObjectSizeLessThan *int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"`
|
||||
Prefix *string `xml:"Prefix" json:"Prefix"`
|
||||
Tags []Tag `xml:"Tags" json:"Tags"`
|
||||
}
|
||||
|
||||
Tag struct {
|
||||
|
@ -51,19 +54,118 @@ type (
|
|||
}
|
||||
|
||||
NoncurrentVersionExpiration struct {
|
||||
NewerNoncurrentVersions int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"`
|
||||
NoncurrentDays int64 `xml:"NoncurrentDays" json:"NoncurrentDays"`
|
||||
NewerNoncurrentVersions *int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"`
|
||||
NoncurrentDays *int64 `xml:"NoncurrentDays" json:"NoncurrentDays"`
|
||||
}
|
||||
|
||||
NoncurrentVersionTransition struct {
|
||||
NewerNoncurrentVersions int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"`
|
||||
NoncurrentDays int64 `xml:"NoncurrentDays" json:"NoncurrentDays"`
|
||||
NewerNoncurrentVersions *int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"`
|
||||
NoncurrentDays *int64 `xml:"NoncurrentDays" json:"NoncurrentDays"`
|
||||
StorageClass string `xml:"StorageClass" json:"StorageClass"`
|
||||
}
|
||||
|
||||
Transition struct {
|
||||
Date string `xml:"Date" json:"Date"`
|
||||
Days int64 `xml:"Days" json:"Days"`
|
||||
StorageClass string `xml:"StorageClass" json:"StorageClass"`
|
||||
Date *string `xml:"Date" json:"Date"`
|
||||
Days *int64 `xml:"Days" json:"Days"`
|
||||
StorageClass string `xml:"StorageClass" json:"StorageClass"`
|
||||
}
|
||||
|
||||
ExpirationObject struct {
|
||||
Expiration *Expiration
|
||||
RuleID string
|
||||
LifecycleConfigID string
|
||||
}
|
||||
)
|
||||
|
||||
func (r *Rule) RealPrefix() string {
|
||||
if r.Filter == nil {
|
||||
if r.Prefix != nil {
|
||||
return *r.Prefix
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
if r.Filter.And == nil {
|
||||
if r.Filter.Prefix != nil {
|
||||
return *r.Filter.Prefix
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
if r.Filter.And.Prefix != nil {
|
||||
return *r.Filter.And.Prefix
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *Rule) NeedTags() bool {
|
||||
if r.Filter == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Filter.And == nil {
|
||||
return r.Filter.Tag != nil
|
||||
}
|
||||
|
||||
return len(r.Filter.And.Tags) != 0
|
||||
}
|
||||
|
||||
func (r *Rule) MatchObject(obj *ObjectInfo, tags map[string]string) bool {
|
||||
if r.Filter == nil {
|
||||
if r.Prefix != nil {
|
||||
return strings.HasPrefix(obj.Name, *r.Prefix)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
if r.Filter.And == nil {
|
||||
if r.Filter.Prefix != nil && !strings.HasPrefix(obj.Name, *r.Filter.Prefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Filter.Tag != nil {
|
||||
if tags == nil {
|
||||
return false
|
||||
}
|
||||
if tagVal := tags[r.Filter.Tag.Key]; tagVal != r.Filter.Tag.Value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if r.Filter.ObjectSizeLessThan != nil && *r.Filter.ObjectSizeLessThan > 0 && obj.Size >= *r.Filter.ObjectSizeLessThan {
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Filter.ObjectSizeGreaterThan != nil && obj.Size <= *r.Filter.ObjectSizeGreaterThan {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
if r.Filter.And.Prefix != nil && !strings.HasPrefix(obj.Name, *r.Filter.And.Prefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(r.Filter.And.Tags) != 0 {
|
||||
if tags == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, tag := range r.Filter.And.Tags {
|
||||
if tagVal := tags[tag.Key]; tagVal != tag.Value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if r.Filter.And.ObjectSizeLessThan != nil && obj.Size >= *r.Filter.And.ObjectSizeLessThan {
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Filter.And.ObjectSizeGreaterThan != nil && obj.Size <= *r.Filter.And.ObjectSizeGreaterThan {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -195,6 +195,11 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = h.setExpirationHeader(r.Context(), bktInfo, info, w.Header()); err != nil {
|
||||
h.logAndSendError(w, "could not get expiration info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
|
|
|
@ -2,7 +2,11 @@ package handler
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"github.com/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
|
@ -103,6 +107,11 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = h.setExpirationHeader(r.Context(), bktInfo, info, w.Header()); err != nil {
|
||||
h.logAndSendError(w, "could not get expiration info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
|
@ -157,3 +166,24 @@ func writeLockHeaders(h http.Header, legalHold *data.LegalHold, retention *data.
|
|||
h.Set(api.AmzObjectLockMode, retention.Mode)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) setExpirationHeader(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo, header http.Header) error {
|
||||
var expirationObjInfo data.ObjectInfo
|
||||
|
||||
// todo get expiration object info
|
||||
|
||||
ruleID := expirationObjInfo.Headers[layer.AttributeExpireRuleID]
|
||||
|
||||
expDate, err := time.Parse(time.RFC3339, expirationObjInfo.Headers[layer.AttributeExpireDate])
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't parse ivalid expiration time: %w", err)
|
||||
}
|
||||
|
||||
writeExpirationHeader(header, ruleID, expDate)
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeExpirationHeader(h http.Header, ruleID string, expDate time.Time) {
|
||||
header := "expiry-date=\"%s\", rule-id=\"%s\""
|
||||
h.Set(api.AmzExpiration, fmt.Sprintf(header, expDate.UTC().Format(http.TimeFormat), url.QueryEscape(ruleID)))
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
)
|
||||
|
||||
func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -61,13 +60,13 @@ func (h *handler) GetBucketLifecycleHandler(w http.ResponseWriter, r *http.Reque
|
|||
return
|
||||
}
|
||||
|
||||
if settings.LifecycleConfiguration == nil {
|
||||
if settings.LifecycleConfig == nil || settings.LifecycleConfig.CurrentConfiguration == nil {
|
||||
h.logAndSendError(w, "lifecycle configuration doesn't exist", reqInfo,
|
||||
apiErrors.GetAPIError(apiErrors.ErrNoSuchLifecycleConfiguration))
|
||||
return
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, settings.LifecycleConfiguration); err != nil {
|
||||
if err = api.EncodeToResponse(w, settings.LifecycleConfig.CurrentConfiguration); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
@ -93,27 +92,19 @@ func (h *handler) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Re
|
|||
}
|
||||
|
||||
func (h *handler) updateLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, lifecycleConf *data.LifecycleConfiguration) error {
|
||||
settings, err := h.obj.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get bucket settings: %w", err)
|
||||
}
|
||||
|
||||
settings.LifecycleConfiguration = lifecycleConf
|
||||
sp := &layer.PutSettingsParams{
|
||||
BktInfo: bktInfo,
|
||||
Settings: settings,
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketSettings(ctx, sp); err != nil {
|
||||
return fmt.Errorf("couldn't put bucket settings: %w", err)
|
||||
// todo consider run as separate goroutine
|
||||
if err := h.obj.ScheduleLifecycle(ctx, bktInfo, lifecycleConf); err != nil {
|
||||
return fmt.Errorf("couldn't apply lifecycle: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkLifecycleConfiguration(conf *data.LifecycleConfiguration) error {
|
||||
err := apiErrors.GetAPIError(apiErrors.ErrMalformedXML)
|
||||
|
||||
if len(conf.Rules) == 0 {
|
||||
return apiErrors.GetAPIError(apiErrors.ErrMalformedXML)
|
||||
return err
|
||||
}
|
||||
if len(conf.Rules) > 1000 {
|
||||
return fmt.Errorf("you cannot have more than 1000 rules")
|
||||
|
@ -121,17 +112,35 @@ func checkLifecycleConfiguration(conf *data.LifecycleConfiguration) error {
|
|||
|
||||
for _, rule := range conf.Rules {
|
||||
if rule.Status != enabledValue && rule.Status != disabledValue {
|
||||
return apiErrors.GetAPIError(apiErrors.ErrMalformedXML)
|
||||
return err
|
||||
}
|
||||
if rule.Prefix != nil && rule.Filter != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rule.Filter != nil {
|
||||
if rule.Filter.ObjectSizeGreaterThan < 0 || rule.Filter.ObjectSizeLessThan < 0 {
|
||||
return apiErrors.GetAPIError(apiErrors.ErrMalformedXML)
|
||||
if rule.Filter.ObjectSizeGreaterThan != nil && *rule.Filter.ObjectSizeGreaterThan < 0 ||
|
||||
rule.Filter.ObjectSizeLessThan != nil && *rule.Filter.ObjectSizeLessThan < 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
if !filterContainsOneOption(rule.Filter) {
|
||||
return apiErrors.GetAPIError(apiErrors.ErrMalformedXML)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !ruleHasAction(rule) {
|
||||
return err
|
||||
}
|
||||
|
||||
// currently only expiration action is supported
|
||||
if rule.Expiration == nil {
|
||||
return err
|
||||
}
|
||||
if rule.Expiration.Days != nil && rule.Expiration.Date != nil ||
|
||||
rule.Expiration.Days == nil && rule.Expiration.Date == nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -139,7 +148,7 @@ func checkLifecycleConfiguration(conf *data.LifecycleConfiguration) error {
|
|||
|
||||
func filterContainsOneOption(filter *data.LifecycleRuleFilter) bool {
|
||||
exactlyOneOption := 0
|
||||
if filter.Prefix != "" {
|
||||
if filter.Prefix != nil {
|
||||
exactlyOneOption++
|
||||
}
|
||||
if filter.And != nil {
|
||||
|
@ -151,3 +160,9 @@ func filterContainsOneOption(filter *data.LifecycleRuleFilter) bool {
|
|||
|
||||
return exactlyOneOption == 1
|
||||
}
|
||||
|
||||
func ruleHasAction(rule data.Rule) bool {
|
||||
return rule.AbortIncompleteMultipartUpload != nil || rule.Expiration != nil ||
|
||||
rule.NoncurrentVersionExpiration != nil || len(rule.Transitions) != 0 ||
|
||||
len(rule.NoncurrentVersionTransitions) != 0
|
||||
}
|
||||
|
|
|
@ -20,6 +20,10 @@ func TestCheckLifecycleConfiguration(t *testing.T) {
|
|||
rules[i] = data.Rule{ID: strconv.Itoa(i), Status: disabledValue}
|
||||
}
|
||||
|
||||
prefix := "prefix"
|
||||
invalidSize := int64(-1)
|
||||
days := int64(1)
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
configuration *data.LifecycleConfiguration
|
||||
|
@ -28,8 +32,9 @@ func TestCheckLifecycleConfiguration(t *testing.T) {
|
|||
{
|
||||
name: "basic",
|
||||
configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{
|
||||
ID: "Some ID",
|
||||
Status: "Disabled",
|
||||
ID: "Some ID",
|
||||
Status: "Disabled",
|
||||
Expiration: &data.Expiration{Days: &days},
|
||||
}}},
|
||||
noError: true,
|
||||
},
|
||||
|
@ -60,7 +65,7 @@ func TestCheckLifecycleConfiguration(t *testing.T) {
|
|||
configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{
|
||||
Status: enabledValue,
|
||||
Filter: &data.LifecycleRuleFilter{
|
||||
Prefix: "prefix",
|
||||
Prefix: &prefix,
|
||||
Tag: &data.Tag{},
|
||||
},
|
||||
}}},
|
||||
|
@ -70,7 +75,7 @@ func TestCheckLifecycleConfiguration(t *testing.T) {
|
|||
configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{
|
||||
Status: enabledValue,
|
||||
Filter: &data.LifecycleRuleFilter{
|
||||
ObjectSizeGreaterThan: -1,
|
||||
ObjectSizeGreaterThan: &invalidSize,
|
||||
},
|
||||
}}},
|
||||
},
|
||||
|
@ -79,7 +84,7 @@ func TestCheckLifecycleConfiguration(t *testing.T) {
|
|||
configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{
|
||||
Status: enabledValue,
|
||||
Filter: &data.LifecycleRuleFilter{
|
||||
ObjectSizeLessThan: -1,
|
||||
ObjectSizeLessThan: &invalidSize,
|
||||
},
|
||||
}}},
|
||||
},
|
||||
|
@ -106,13 +111,14 @@ func TestBucketLifecycleConfiguration(t *testing.T) {
|
|||
hc.Handler().GetBucketLifecycleHandler(w, r)
|
||||
assertS3Error(t, w, apiErrors.GetAPIError(apiErrors.ErrNoSuchLifecycleConfiguration))
|
||||
|
||||
days := int64(1)
|
||||
lifecycleConf := &data.LifecycleConfiguration{
|
||||
XMLName: xmlName("LifecycleConfiguration"),
|
||||
Rules: []data.Rule{
|
||||
{
|
||||
AbortIncompleteMultipartUpload: &data.AbortIncompleteMultipartUpload{},
|
||||
ID: "Test",
|
||||
Status: "Disabled",
|
||||
Expiration: &data.Expiration{Days: &days},
|
||||
ID: "Test",
|
||||
Status: "Disabled",
|
||||
},
|
||||
}}
|
||||
w, r = prepareTestRequest(t, bktName, "", lifecycleConf)
|
||||
|
|
|
@ -57,6 +57,7 @@ const (
|
|||
AmzObjectAttributes = "X-Amz-Object-Attributes"
|
||||
AmzMaxParts = "X-Amz-Max-Parts"
|
||||
AmzPartNumberMarker = "X-Amz-Part-Number-Marker"
|
||||
AmzExpiration = "X-Amz-Expiration"
|
||||
|
||||
AmzServerSideEncryptionCustomerAlgorithm = "x-amz-server-side-encryption-customer-algorithm"
|
||||
AmzServerSideEncryptionCustomerKey = "x-amz-server-side-encryption-customer-key"
|
||||
|
|
|
@ -191,6 +191,8 @@ type (
|
|||
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
|
||||
PutBucketSettings(ctx context.Context, p *PutSettingsParams) error
|
||||
|
||||
ScheduleLifecycle(ctx context.Context, bktInfo *data.BucketInfo, new *data.LifecycleConfiguration) error
|
||||
|
||||
PutBucketCORS(ctx context.Context, p *PutCORSParams) error
|
||||
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error)
|
||||
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
|
@ -288,7 +290,9 @@ func (n *layer) Initialize(ctx context.Context, c EventListener) error {
|
|||
return fmt.Errorf("already initialized")
|
||||
}
|
||||
|
||||
// todo add notification handlers (e.g. for lifecycles)
|
||||
if err := c.Subscribe(ctx, ExpireTopic, MsgHandlerFunc(n.handleExpireTick)); err != nil {
|
||||
return fmt.Errorf("couldn't initialize layer: %w", err)
|
||||
}
|
||||
|
||||
c.Listen(ctx)
|
||||
|
||||
|
|
211
api/layer/lifecycle.go
Normal file
211
api/layer/lifecycle.go
Normal file
|
@ -0,0 +1,211 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH"
|
||||
AttributeSysTickEpoch = "__NEOFS__TICK_EPOCH"
|
||||
AttributeSysTickTopic = "__NEOFS__TICK_TOPIC"
|
||||
AttributeParentObject = ".s3-expire-parent-object"
|
||||
AttributeParentBucket = ".s3-expire-parent-bucket"
|
||||
AttributeExpireDate = ".s3-expire-date"
|
||||
AttributeExpireRuleID = ".s3-expire-rule-id"
|
||||
AttributeLifecycleConfigID = ".s3-lifecycle-config"
|
||||
ExpireTopic = "expire"
|
||||
)
|
||||
|
||||
func (n *layer) handleExpireTick(ctx context.Context, msg *nats.Msg) error {
|
||||
var addr oid.Address
|
||||
if err := addr.DecodeString(string(msg.Data)); err != nil {
|
||||
return fmt.Errorf("invalid msg, address expected: %w", err)
|
||||
}
|
||||
|
||||
n.log.Debug("handling expiration tick", zap.String("address", string(msg.Data)))
|
||||
|
||||
// and make sure having right access
|
||||
|
||||
//todo redo
|
||||
bktInfo := &data.BucketInfo{CID: addr.Container()}
|
||||
|
||||
obj, err := n.objectHead(ctx, bktInfo, addr.Object())
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't head expiration object: %w", err)
|
||||
}
|
||||
|
||||
header := userHeaders(obj.Attributes())
|
||||
objName := header[AttributeParentObject]
|
||||
bktName := header[AttributeParentBucket]
|
||||
if objName == "" || bktName == "" {
|
||||
return fmt.Errorf("couldn't know bucket/object to expire")
|
||||
}
|
||||
|
||||
p := &DeleteObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
Objects: []*VersionedObject{{Name: objName}},
|
||||
}
|
||||
|
||||
res := n.DeleteObjects(ctx, p)
|
||||
if res[0].Error != nil {
|
||||
return fmt.Errorf("couldn't delete expired object: %w", res[0].Error)
|
||||
}
|
||||
|
||||
return n.objectDelete(ctx, bktInfo, addr.Object())
|
||||
}
|
||||
|
||||
func (n *layer) ScheduleLifecycle(ctx context.Context, bktInfo *data.BucketInfo, newConf *data.LifecycleConfiguration) error {
|
||||
if newConf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
lifecycleID, err := computeLifecycleID(newConf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't compute lifecycle id: %w", err)
|
||||
}
|
||||
|
||||
// We want to be able to revert partly applied lifecycle if something goes wrong.
|
||||
if err = n.updateLifecycle(ctx, bktInfo, &data.LifecycleConfig{
|
||||
OldConfigurationID: lifecycleID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = n.applyLifecycle(ctx, bktInfo, lifecycleID, newConf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.updateLifecycle(ctx, bktInfo, &data.LifecycleConfig{
|
||||
OldConfigurationID: lifecycleID,
|
||||
CurrentConfiguration: newConf,
|
||||
})
|
||||
}
|
||||
|
||||
func (n *layer) updateLifecycle(ctx context.Context, bktInfo *data.BucketInfo, lifecycleConfig *data.LifecycleConfig) error {
|
||||
settings, err := n.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get bucket settings: %w", err)
|
||||
}
|
||||
|
||||
settings.LifecycleConfig = lifecycleConfig
|
||||
sp := &PutSettingsParams{
|
||||
BktInfo: bktInfo,
|
||||
Settings: settings,
|
||||
}
|
||||
|
||||
if err = n.PutBucketSettings(ctx, sp); err != nil {
|
||||
return fmt.Errorf("couldn't put bucket settings: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) applyLifecycle(ctx context.Context, bktInfo *data.BucketInfo, lifecycleID string, conf *data.LifecycleConfiguration) error {
|
||||
for _, rule := range conf.Rules {
|
||||
if rule.Status == "Disabled" {
|
||||
continue
|
||||
}
|
||||
|
||||
listParam := allObjectParams{
|
||||
Bucket: bktInfo,
|
||||
Prefix: rule.RealPrefix(),
|
||||
}
|
||||
|
||||
objects, _, err := n.getLatestObjectsVersions(ctx, listParam)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = n.applyLifecycleToObjects(ctx, bktInfo, lifecycleID, rule, objects); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) applyLifecycleToObjects(ctx context.Context, bktInfo *data.BucketInfo, lifecycleID string, rule data.Rule, objects []*data.ObjectInfo) error {
|
||||
var tags []map[string]string
|
||||
var err error
|
||||
if rule.NeedTags() {
|
||||
tags = make([]map[string]string, len(objects))
|
||||
p := &ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
}
|
||||
for i, obj := range objects {
|
||||
p.ObjectName = obj.Name
|
||||
p.VersionID = obj.VersionID()
|
||||
if _, tags[i], err = n.GetObjectTagging(ctx, p); err != nil {
|
||||
return fmt.Errorf("couldn't get object tags: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i, obj := range objects {
|
||||
var objTags map[string]string
|
||||
if len(tags) != 0 {
|
||||
objTags = tags[i]
|
||||
}
|
||||
if !rule.MatchObject(obj, objTags) {
|
||||
continue
|
||||
}
|
||||
|
||||
expObj := &data.ExpirationObject{
|
||||
Expiration: rule.Expiration,
|
||||
RuleID: rule.ID,
|
||||
LifecycleConfigID: lifecycleID,
|
||||
}
|
||||
|
||||
if _, err = n.putExpirationObject(ctx, bktInfo, obj, expObj); err != nil {
|
||||
return fmt.Errorf("couldn't put expiration object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) putLifecycleObjects(ctx context.Context, bktInfo *data.BucketInfo, obj *data.ObjectInfo, lifecycle *data.LifecycleConfig) error {
|
||||
if lifecycle == nil || lifecycle.CurrentConfiguration == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, rule := range lifecycle.CurrentConfiguration.Rules {
|
||||
if rule.Status == "Disabled" {
|
||||
continue
|
||||
}
|
||||
|
||||
// at this time lifecycle.OldConfigurationID is the same as lifecycle.CurrentConfiguration id
|
||||
if err := n.applyLifecycleToObjects(ctx, bktInfo, lifecycle.OldConfigurationID, rule, []*data.ObjectInfo{obj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func computeLifecycleID(conf *data.LifecycleConfiguration) (string, error) {
|
||||
raw, err := xml.Marshal(conf)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("couldn't marshall new lifecycle configuration: %w", err)
|
||||
}
|
||||
|
||||
sha := sha256.New()
|
||||
sha.Write(raw)
|
||||
sum := sha.Sum(nil)
|
||||
|
||||
id := hex.EncodeToString(sum)
|
||||
|
||||
if id == "" {
|
||||
return "", fmt.Errorf("computed id is empty")
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
159
api/layer/lifecycle_test.go
Normal file
159
api/layer/lifecycle_test.go
Normal file
|
@ -0,0 +1,159 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestComputeLifecycleID(t *testing.T) {
|
||||
conf := &data.LifecycleConfiguration{Rules: []data.Rule{
|
||||
{
|
||||
ID: "id",
|
||||
Status: "Enabled",
|
||||
},
|
||||
}}
|
||||
|
||||
id, err := computeLifecycleID(conf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "51ff619dc848622287764fc7c4aec06b7c1a5936c25b8eee48a0dbcb4eeac9f4", id)
|
||||
}
|
||||
|
||||
func TestRuleMatchObject(t *testing.T) {
|
||||
prefix, suffix := "prefix", "suffix"
|
||||
objSizeMin, objSizeMax := int64(512), int64(1024)
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
rule data.Rule
|
||||
obj *data.ObjectInfo
|
||||
tags map[string]string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "basic match",
|
||||
rule: data.Rule{Prefix: &prefix},
|
||||
obj: &data.ObjectInfo{Name: prefix + suffix},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "basic no match",
|
||||
rule: data.Rule{Prefix: &prefix},
|
||||
obj: &data.ObjectInfo{Name: suffix + prefix},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "filter and sizes",
|
||||
rule: data.Rule{Filter: &data.LifecycleRuleFilter{
|
||||
And: &data.LifecycleRuleAndOperator{
|
||||
ObjectSizeGreaterThan: &objSizeMin,
|
||||
ObjectSizeLessThan: &objSizeMax,
|
||||
},
|
||||
}},
|
||||
obj: &data.ObjectInfo{Name: suffix, Size: 768},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "filter prefix",
|
||||
rule: data.Rule{Filter: &data.LifecycleRuleFilter{
|
||||
Prefix: &prefix,
|
||||
}},
|
||||
obj: &data.ObjectInfo{Name: prefix + suffix},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "filter prefix no match",
|
||||
rule: data.Rule{Filter: &data.LifecycleRuleFilter{
|
||||
Prefix: &prefix,
|
||||
}},
|
||||
obj: &data.ObjectInfo{Name: suffix},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "filter tags",
|
||||
rule: data.Rule{Filter: &data.LifecycleRuleFilter{
|
||||
Tag: &data.Tag{
|
||||
Key: "key",
|
||||
Value: "val",
|
||||
},
|
||||
}},
|
||||
tags: map[string]string{"key": "val"},
|
||||
obj: &data.ObjectInfo{},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "filter and tags no match",
|
||||
rule: data.Rule{Filter: &data.LifecycleRuleFilter{
|
||||
And: &data.LifecycleRuleAndOperator{
|
||||
Tags: []data.Tag{{
|
||||
Key: "key",
|
||||
Value: "val",
|
||||
}},
|
||||
},
|
||||
}},
|
||||
tags: map[string]string{"key": "val2"},
|
||||
obj: &data.ObjectInfo{},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "filter size no match",
|
||||
rule: data.Rule{Filter: &data.LifecycleRuleFilter{
|
||||
ObjectSizeGreaterThan: &objSizeMax,
|
||||
}},
|
||||
obj: &data.ObjectInfo{Size: objSizeMin},
|
||||
expected: false,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
require.Equal(t, tc.expected, tc.rule.MatchObject(tc.obj, tc.tags))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestScheduleLifecycle(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
|
||||
obj1 := tc.putObject([]byte("content"))
|
||||
|
||||
date := "2022-03-14T09:59:03Z"
|
||||
date2 := "2022-03-15T09:59:03Z"
|
||||
prefix := "prefix"
|
||||
tc.obj = prefix
|
||||
obj2 := tc.putObject([]byte("content2"))
|
||||
|
||||
conf := &data.LifecycleConfiguration{
|
||||
Rules: []data.Rule{{
|
||||
Filter: &data.LifecycleRuleFilter{
|
||||
Prefix: &prefix,
|
||||
},
|
||||
Expiration: &data.Expiration{
|
||||
Date: &date,
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
err := tc.layer.ScheduleLifecycle(tc.ctx, tc.bktInfo, conf)
|
||||
require.NoError(t, err)
|
||||
|
||||
expObj1, _ := tc.getObject(obj1.ExpirationObject(), "", false)
|
||||
require.Nil(t, expObj1)
|
||||
expObj2, _ := tc.getObject(obj2.ExpirationObject(), "", false)
|
||||
require.NotNil(t, expObj2)
|
||||
assertExpirationObject(t, expObj2, date)
|
||||
|
||||
conf.Rules[0].Expiration.Date = &date2
|
||||
err = tc.layer.ScheduleLifecycle(tc.ctx, tc.bktInfo, conf)
|
||||
require.NoError(t, err)
|
||||
|
||||
expObj2, _ = tc.getObject(obj2.ExpirationObject(), "", false)
|
||||
require.NotNil(t, expObj2)
|
||||
assertExpirationObject(t, expObj2, date2)
|
||||
}
|
||||
|
||||
func assertExpirationObject(t *testing.T, expObjInfo *data.ObjectInfo, date string) {
|
||||
require.Equal(t, expObjInfo.Headers[AttributeExpireDate], date)
|
||||
require.Contains(t, expObjInfo.Headers, AttributeSysTickEpoch)
|
||||
require.Contains(t, expObjInfo.Headers, AttributeSysTickTopic)
|
||||
require.Contains(t, expObjInfo.Headers, AttributeLifecycleConfigID)
|
||||
}
|
|
@ -298,6 +298,13 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
|||
NodeVersion: newVersion,
|
||||
}
|
||||
|
||||
// todo filling api.AmzExpiration header
|
||||
if err = n.putLifecycleObjects(ctx, p.BktInfo, objInfo, bktSettings.LifecycleConfig); err != nil {
|
||||
return nil, fmt.Errorf("couldn't put expiration system objects: %w", err)
|
||||
}
|
||||
|
||||
n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
|
||||
|
||||
n.cache.PutObjectWithName(owner, extendedObjInfo)
|
||||
|
||||
return extendedObjInfo, nil
|
||||
|
|
|
@ -15,8 +15,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
AttributeComplianceMode = ".s3-compliance-mode"
|
||||
AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH"
|
||||
AttributeComplianceMode = ".s3-compliance-mode"
|
||||
)
|
||||
|
||||
type PutLockInfoParams struct {
|
||||
|
@ -130,6 +129,53 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
|
|||
return id, err
|
||||
}
|
||||
|
||||
func (n *layer) putExpirationObject(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo, expObj *data.ExpirationObject) (oid.ID, error) {
|
||||
prm := PrmObjectCreate{
|
||||
Container: bktInfo.CID,
|
||||
Creator: bktInfo.Owner,
|
||||
Filepath: objInfo.ExpirationObject(),
|
||||
}
|
||||
|
||||
var (
|
||||
err error
|
||||
exp uint64
|
||||
expTime time.Time
|
||||
)
|
||||
|
||||
if expObj.Expiration.Days != nil {
|
||||
expTime = objInfo.Created.Add(time.Duration(*expObj.Expiration.Days) * 24 * time.Hour).UTC()
|
||||
// emulate rounding the resulting time to the next day midnight UTC
|
||||
toMidnight := 24 - expTime.UTC().Hour()
|
||||
expTime = expTime.Add(time.Duration(toMidnight) * time.Hour)
|
||||
} else {
|
||||
expTime, err = time.Parse(time.RFC3339, *expObj.Expiration.Date)
|
||||
if err != nil {
|
||||
return oid.ID{}, fmt.Errorf("couldn't parse expiration date '%s': %w", *expObj.Expiration.Date, err)
|
||||
}
|
||||
}
|
||||
|
||||
if expTime.After(time.Now()) {
|
||||
_, exp, err = n.neoFS.TimeToEpoch(ctx, expTime)
|
||||
if err != nil {
|
||||
return oid.ID{}, fmt.Errorf("couldn't compute expiration epoch: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
prm.Attributes = [][2]string{
|
||||
{AttributeExpirationEpoch, strconv.FormatUint(exp+4, 10)},
|
||||
{AttributeSysTickEpoch, strconv.FormatUint(exp, 10)},
|
||||
{AttributeSysTickTopic, ExpireTopic},
|
||||
{AttributeParentObject, objInfo.Name},
|
||||
{AttributeParentBucket, bktInfo.Name},
|
||||
{AttributeExpireDate, expTime.Format(time.RFC3339)},
|
||||
{AttributeExpireRuleID, expObj.RuleID},
|
||||
{AttributeLifecycleConfigID, expObj.LifecycleConfigID},
|
||||
}
|
||||
|
||||
id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (n *layer) GetLockInfo(ctx context.Context, objVersion *ObjectVersion) (*data.LockInfo, error) {
|
||||
owner := n.Owner(ctx)
|
||||
if lockInfo := n.cache.GetLockInfo(owner, lockObjectKey(objVersion)); lockInfo != nil {
|
||||
|
|
Loading…
Reference in a new issue