From 749b095acd66f16b0a3a78688e277b4c516d04e0 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 10 Mar 2022 12:25:10 +0300 Subject: [PATCH] [#192] Add handling expiration lifecycle Signed-off-by: Denis Kirillov --- api/data/info.go | 15 ++- api/data/lifecycle.go | 140 +++++++++++++++++++--- api/handler/get.go | 5 + api/handler/head.go | 30 +++++ api/handler/lifecycle.go | 59 ++++++---- api/handler/lifecycle_test.go | 22 ++-- api/headers.go | 1 + api/layer/layer.go | 6 +- api/layer/lifecycle.go | 211 ++++++++++++++++++++++++++++++++++ api/layer/lifecycle_test.go | 159 +++++++++++++++++++++++++ api/layer/object.go | 7 ++ api/layer/system_object.go | 50 +++++++- 12 files changed, 650 insertions(+), 55 deletions(-) create mode 100644 api/layer/lifecycle.go create mode 100644 api/layer/lifecycle_test.go diff --git a/api/data/info.go b/api/data/info.go index 09130b59..6072ca01 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -57,9 +57,15 @@ type ( // BucketSettings stores settings such as versioning. BucketSettings struct { - Versioning string `json:"versioning"` - LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"` - LifecycleConfiguration *LifecycleConfiguration `json:"lifecycle_configuration"` + Versioning string `json:"versioning"` + LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"` + LifecycleConfig *LifecycleConfig `json:"lifecycle_configuration"` + } + + // LifecycleConfig stores lifecycle config old and current settings. + LifecycleConfig struct { + OldConfigurationID string `json:"old_id"` + CurrentConfiguration *LifecycleConfiguration } // CORSConfiguration stores CORS configuration of a request. @@ -125,3 +131,6 @@ func (b BucketSettings) VersioningEnabled() bool { func (b BucketSettings) VersioningSuspended() bool { return b.Versioning == VersioningSuspended } + +// ExpirationObject returns name of system object for expiration tick object. +func (o *ObjectInfo) ExpirationObject() string { return ".expiration." + o.Name } diff --git a/api/data/lifecycle.go b/api/data/lifecycle.go index df62908c..89d5a5c8 100644 --- a/api/data/lifecycle.go +++ b/api/data/lifecycle.go @@ -1,6 +1,9 @@ package data -import "encoding/xml" +import ( + "encoding/xml" + "strings" +) type ( LifecycleConfiguration struct { @@ -15,7 +18,7 @@ type ( ID string `xml:"ID" json:"ID"` NoncurrentVersionExpiration *NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration" json:"NoncurrentVersionExpiration"` NoncurrentVersionTransitions []NoncurrentVersionTransition `xml:"NoncurrentVersionTransition" json:"NoncurrentVersionTransition"` - Prefix string `xml:"Prefix" json:"Prefix"` + Prefix *string `xml:"Prefix" json:"Prefix"` Status string `xml:"Status" json:"Status"` Transitions []Transition `xml:"Transition" json:"Transition"` } @@ -25,24 +28,24 @@ type ( } Expiration struct { - Date string `xml:"Date" json:"Date"` - Days int64 `xml:"Days" json:"Days"` - ExpiredObjectDeleteMarker bool `xml:"ExpiredObjectDeleteMarker" json:"ExpiredObjectDeleteMarker"` + Date *string `xml:"Date" json:"Date"` + Days *int64 `xml:"Days" json:"Days"` + ExpiredObjectDeleteMarker bool `xml:"ExpiredObjectDeleteMarker" json:"ExpiredObjectDeleteMarker"` } LifecycleRuleFilter struct { And *LifecycleRuleAndOperator `xml:"And" json:"And"` - ObjectSizeGreaterThan int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"` - ObjectSizeLessThan int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"` - Prefix string `xml:"Prefix" json:"Prefix"` + ObjectSizeGreaterThan *int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"` + ObjectSizeLessThan *int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"` + Prefix *string `xml:"Prefix" json:"Prefix"` Tag *Tag `xml:"Tag" json:"Tag"` } LifecycleRuleAndOperator struct { - ObjectSizeGreaterThan int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"` - ObjectSizeLessThan int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"` - Prefix string `xml:"Prefix" json:"Prefix"` - Tags []Tag `xml:"Tags" json:"Tags"` + ObjectSizeGreaterThan *int64 `xml:"ObjectSizeGreaterThan" json:"ObjectSizeGreaterThan"` + ObjectSizeLessThan *int64 `xml:"ObjectSizeLessThan" json:"ObjectSizeLessThan"` + Prefix *string `xml:"Prefix" json:"Prefix"` + Tags []Tag `xml:"Tags" json:"Tags"` } Tag struct { @@ -51,19 +54,118 @@ type ( } NoncurrentVersionExpiration struct { - NewerNoncurrentVersions int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"` - NoncurrentDays int64 `xml:"NoncurrentDays" json:"NoncurrentDays"` + NewerNoncurrentVersions *int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"` + NoncurrentDays *int64 `xml:"NoncurrentDays" json:"NoncurrentDays"` } NoncurrentVersionTransition struct { - NewerNoncurrentVersions int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"` - NoncurrentDays int64 `xml:"NoncurrentDays" json:"NoncurrentDays"` + NewerNoncurrentVersions *int64 `xml:"NewerNoncurrentVersions" json:"NewerNoncurrentVersions"` + NoncurrentDays *int64 `xml:"NoncurrentDays" json:"NoncurrentDays"` StorageClass string `xml:"StorageClass" json:"StorageClass"` } Transition struct { - Date string `xml:"Date" json:"Date"` - Days int64 `xml:"Days" json:"Days"` - StorageClass string `xml:"StorageClass" json:"StorageClass"` + Date *string `xml:"Date" json:"Date"` + Days *int64 `xml:"Days" json:"Days"` + StorageClass string `xml:"StorageClass" json:"StorageClass"` + } + + ExpirationObject struct { + Expiration *Expiration + RuleID string + LifecycleConfigID string } ) + +func (r *Rule) RealPrefix() string { + if r.Filter == nil { + if r.Prefix != nil { + return *r.Prefix + } + return "" + } + + if r.Filter.And == nil { + if r.Filter.Prefix != nil { + return *r.Filter.Prefix + } + return "" + } + + if r.Filter.And.Prefix != nil { + return *r.Filter.And.Prefix + } + return "" +} + +func (r *Rule) NeedTags() bool { + if r.Filter == nil { + return false + } + + if r.Filter.And == nil { + return r.Filter.Tag != nil + } + + return len(r.Filter.And.Tags) != 0 +} + +func (r *Rule) MatchObject(obj *ObjectInfo, tags map[string]string) bool { + if r.Filter == nil { + if r.Prefix != nil { + return strings.HasPrefix(obj.Name, *r.Prefix) + } + return true + } + + if r.Filter.And == nil { + if r.Filter.Prefix != nil && !strings.HasPrefix(obj.Name, *r.Filter.Prefix) { + return false + } + + if r.Filter.Tag != nil { + if tags == nil { + return false + } + if tagVal := tags[r.Filter.Tag.Key]; tagVal != r.Filter.Tag.Value { + return false + } + } + + if r.Filter.ObjectSizeLessThan != nil && *r.Filter.ObjectSizeLessThan > 0 && obj.Size >= *r.Filter.ObjectSizeLessThan { + return false + } + + if r.Filter.ObjectSizeGreaterThan != nil && obj.Size <= *r.Filter.ObjectSizeGreaterThan { + return false + } + + return true + } + + if r.Filter.And.Prefix != nil && !strings.HasPrefix(obj.Name, *r.Filter.And.Prefix) { + return false + } + + if len(r.Filter.And.Tags) != 0 { + if tags == nil { + return false + } + + for _, tag := range r.Filter.And.Tags { + if tagVal := tags[tag.Key]; tagVal != tag.Value { + return false + } + } + } + + if r.Filter.And.ObjectSizeLessThan != nil && obj.Size >= *r.Filter.And.ObjectSizeLessThan { + return false + } + + if r.Filter.And.ObjectSizeGreaterThan != nil && obj.Size <= *r.Filter.And.ObjectSizeGreaterThan { + return false + } + + return true +} diff --git a/api/handler/get.go b/api/handler/get.go index 735ba13d..6201aa11 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -195,6 +195,11 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } + if err = h.setExpirationHeader(r.Context(), bktInfo, info, w.Header()); err != nil { + h.logAndSendError(w, "could not get expiration info", reqInfo, err) + return + } + bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) if err != nil { h.logAndSendError(w, "could not get bucket settings", reqInfo, err) diff --git a/api/handler/head.go b/api/handler/head.go index 2bfce705..9575b963 100644 --- a/api/handler/head.go +++ b/api/handler/head.go @@ -2,7 +2,11 @@ package handler import ( "bytes" + "context" + "fmt" "net/http" + "net/url" + "time" "github.com/TrueCloudLab/frostfs-s3-gw/api" "github.com/TrueCloudLab/frostfs-s3-gw/api/data" @@ -103,6 +107,11 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { return } + if err = h.setExpirationHeader(r.Context(), bktInfo, info, w.Header()); err != nil { + h.logAndSendError(w, "could not get expiration info", reqInfo, err) + return + } + bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) if err != nil { h.logAndSendError(w, "could not get bucket settings", reqInfo, err) @@ -157,3 +166,24 @@ func writeLockHeaders(h http.Header, legalHold *data.LegalHold, retention *data. h.Set(api.AmzObjectLockMode, retention.Mode) } } + +func (h *handler) setExpirationHeader(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo, header http.Header) error { + var expirationObjInfo data.ObjectInfo + + // todo get expiration object info + + ruleID := expirationObjInfo.Headers[layer.AttributeExpireRuleID] + + expDate, err := time.Parse(time.RFC3339, expirationObjInfo.Headers[layer.AttributeExpireDate]) + if err != nil { + return fmt.Errorf("couldn't parse ivalid expiration time: %w", err) + } + + writeExpirationHeader(header, ruleID, expDate) + return nil +} + +func writeExpirationHeader(h http.Header, ruleID string, expDate time.Time) { + header := "expiry-date=\"%s\", rule-id=\"%s\"" + h.Set(api.AmzExpiration, fmt.Sprintf(header, expDate.UTC().Format(http.TimeFormat), url.QueryEscape(ruleID))) +} diff --git a/api/handler/lifecycle.go b/api/handler/lifecycle.go index 660ab7a3..8021261d 100644 --- a/api/handler/lifecycle.go +++ b/api/handler/lifecycle.go @@ -9,7 +9,6 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors" - "github.com/nspcc-dev/neofs-s3-gw/api/layer" ) func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) { @@ -61,13 +60,13 @@ func (h *handler) GetBucketLifecycleHandler(w http.ResponseWriter, r *http.Reque return } - if settings.LifecycleConfiguration == nil { + if settings.LifecycleConfig == nil || settings.LifecycleConfig.CurrentConfiguration == nil { h.logAndSendError(w, "lifecycle configuration doesn't exist", reqInfo, apiErrors.GetAPIError(apiErrors.ErrNoSuchLifecycleConfiguration)) return } - if err = api.EncodeToResponse(w, settings.LifecycleConfiguration); err != nil { + if err = api.EncodeToResponse(w, settings.LifecycleConfig.CurrentConfiguration); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) } } @@ -93,27 +92,19 @@ func (h *handler) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Re } func (h *handler) updateLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, lifecycleConf *data.LifecycleConfiguration) error { - settings, err := h.obj.GetBucketSettings(ctx, bktInfo) - if err != nil { - return fmt.Errorf("couldn't get bucket settings: %w", err) - } - - settings.LifecycleConfiguration = lifecycleConf - sp := &layer.PutSettingsParams{ - BktInfo: bktInfo, - Settings: settings, - } - - if err = h.obj.PutBucketSettings(ctx, sp); err != nil { - return fmt.Errorf("couldn't put bucket settings: %w", err) + // todo consider run as separate goroutine + if err := h.obj.ScheduleLifecycle(ctx, bktInfo, lifecycleConf); err != nil { + return fmt.Errorf("couldn't apply lifecycle: %w", err) } return nil } func checkLifecycleConfiguration(conf *data.LifecycleConfiguration) error { + err := apiErrors.GetAPIError(apiErrors.ErrMalformedXML) + if len(conf.Rules) == 0 { - return apiErrors.GetAPIError(apiErrors.ErrMalformedXML) + return err } if len(conf.Rules) > 1000 { return fmt.Errorf("you cannot have more than 1000 rules") @@ -121,17 +112,35 @@ func checkLifecycleConfiguration(conf *data.LifecycleConfiguration) error { for _, rule := range conf.Rules { if rule.Status != enabledValue && rule.Status != disabledValue { - return apiErrors.GetAPIError(apiErrors.ErrMalformedXML) + return err } + if rule.Prefix != nil && rule.Filter != nil { + return err + } + if rule.Filter != nil { - if rule.Filter.ObjectSizeGreaterThan < 0 || rule.Filter.ObjectSizeLessThan < 0 { - return apiErrors.GetAPIError(apiErrors.ErrMalformedXML) + if rule.Filter.ObjectSizeGreaterThan != nil && *rule.Filter.ObjectSizeGreaterThan < 0 || + rule.Filter.ObjectSizeLessThan != nil && *rule.Filter.ObjectSizeLessThan < 0 { + return err } if !filterContainsOneOption(rule.Filter) { - return apiErrors.GetAPIError(apiErrors.ErrMalformedXML) + return err } } + + if !ruleHasAction(rule) { + return err + } + + // currently only expiration action is supported + if rule.Expiration == nil { + return err + } + if rule.Expiration.Days != nil && rule.Expiration.Date != nil || + rule.Expiration.Days == nil && rule.Expiration.Date == nil { + return err + } } return nil @@ -139,7 +148,7 @@ func checkLifecycleConfiguration(conf *data.LifecycleConfiguration) error { func filterContainsOneOption(filter *data.LifecycleRuleFilter) bool { exactlyOneOption := 0 - if filter.Prefix != "" { + if filter.Prefix != nil { exactlyOneOption++ } if filter.And != nil { @@ -151,3 +160,9 @@ func filterContainsOneOption(filter *data.LifecycleRuleFilter) bool { return exactlyOneOption == 1 } + +func ruleHasAction(rule data.Rule) bool { + return rule.AbortIncompleteMultipartUpload != nil || rule.Expiration != nil || + rule.NoncurrentVersionExpiration != nil || len(rule.Transitions) != 0 || + len(rule.NoncurrentVersionTransitions) != 0 +} diff --git a/api/handler/lifecycle_test.go b/api/handler/lifecycle_test.go index 65f44590..a3dbf6e0 100644 --- a/api/handler/lifecycle_test.go +++ b/api/handler/lifecycle_test.go @@ -20,6 +20,10 @@ func TestCheckLifecycleConfiguration(t *testing.T) { rules[i] = data.Rule{ID: strconv.Itoa(i), Status: disabledValue} } + prefix := "prefix" + invalidSize := int64(-1) + days := int64(1) + for _, tc := range []struct { name string configuration *data.LifecycleConfiguration @@ -28,8 +32,9 @@ func TestCheckLifecycleConfiguration(t *testing.T) { { name: "basic", configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{ - ID: "Some ID", - Status: "Disabled", + ID: "Some ID", + Status: "Disabled", + Expiration: &data.Expiration{Days: &days}, }}}, noError: true, }, @@ -60,7 +65,7 @@ func TestCheckLifecycleConfiguration(t *testing.T) { configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{ Status: enabledValue, Filter: &data.LifecycleRuleFilter{ - Prefix: "prefix", + Prefix: &prefix, Tag: &data.Tag{}, }, }}}, @@ -70,7 +75,7 @@ func TestCheckLifecycleConfiguration(t *testing.T) { configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{ Status: enabledValue, Filter: &data.LifecycleRuleFilter{ - ObjectSizeGreaterThan: -1, + ObjectSizeGreaterThan: &invalidSize, }, }}}, }, @@ -79,7 +84,7 @@ func TestCheckLifecycleConfiguration(t *testing.T) { configuration: &data.LifecycleConfiguration{Rules: []data.Rule{{ Status: enabledValue, Filter: &data.LifecycleRuleFilter{ - ObjectSizeLessThan: -1, + ObjectSizeLessThan: &invalidSize, }, }}}, }, @@ -106,13 +111,14 @@ func TestBucketLifecycleConfiguration(t *testing.T) { hc.Handler().GetBucketLifecycleHandler(w, r) assertS3Error(t, w, apiErrors.GetAPIError(apiErrors.ErrNoSuchLifecycleConfiguration)) + days := int64(1) lifecycleConf := &data.LifecycleConfiguration{ XMLName: xmlName("LifecycleConfiguration"), Rules: []data.Rule{ { - AbortIncompleteMultipartUpload: &data.AbortIncompleteMultipartUpload{}, - ID: "Test", - Status: "Disabled", + Expiration: &data.Expiration{Days: &days}, + ID: "Test", + Status: "Disabled", }, }} w, r = prepareTestRequest(t, bktName, "", lifecycleConf) diff --git a/api/headers.go b/api/headers.go index 2e0b9a8a..050eaed7 100644 --- a/api/headers.go +++ b/api/headers.go @@ -57,6 +57,7 @@ const ( AmzObjectAttributes = "X-Amz-Object-Attributes" AmzMaxParts = "X-Amz-Max-Parts" AmzPartNumberMarker = "X-Amz-Part-Number-Marker" + AmzExpiration = "X-Amz-Expiration" AmzServerSideEncryptionCustomerAlgorithm = "x-amz-server-side-encryption-customer-algorithm" AmzServerSideEncryptionCustomerKey = "x-amz-server-side-encryption-customer-key" diff --git a/api/layer/layer.go b/api/layer/layer.go index 4d7aee69..3e230fe2 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -191,6 +191,8 @@ type ( GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error + ScheduleLifecycle(ctx context.Context, bktInfo *data.BucketInfo, new *data.LifecycleConfiguration) error + PutBucketCORS(ctx context.Context, p *PutCORSParams) error GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error @@ -288,7 +290,9 @@ func (n *layer) Initialize(ctx context.Context, c EventListener) error { return fmt.Errorf("already initialized") } - // todo add notification handlers (e.g. for lifecycles) + if err := c.Subscribe(ctx, ExpireTopic, MsgHandlerFunc(n.handleExpireTick)); err != nil { + return fmt.Errorf("couldn't initialize layer: %w", err) + } c.Listen(ctx) diff --git a/api/layer/lifecycle.go b/api/layer/lifecycle.go new file mode 100644 index 00000000..1a54b9ea --- /dev/null +++ b/api/layer/lifecycle.go @@ -0,0 +1,211 @@ +package layer + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/xml" + "fmt" + + "github.com/nats-io/nats.go" + "github.com/nspcc-dev/neofs-s3-gw/api/data" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH" + AttributeSysTickEpoch = "__NEOFS__TICK_EPOCH" + AttributeSysTickTopic = "__NEOFS__TICK_TOPIC" + AttributeParentObject = ".s3-expire-parent-object" + AttributeParentBucket = ".s3-expire-parent-bucket" + AttributeExpireDate = ".s3-expire-date" + AttributeExpireRuleID = ".s3-expire-rule-id" + AttributeLifecycleConfigID = ".s3-lifecycle-config" + ExpireTopic = "expire" +) + +func (n *layer) handleExpireTick(ctx context.Context, msg *nats.Msg) error { + var addr oid.Address + if err := addr.DecodeString(string(msg.Data)); err != nil { + return fmt.Errorf("invalid msg, address expected: %w", err) + } + + n.log.Debug("handling expiration tick", zap.String("address", string(msg.Data))) + + // and make sure having right access + + //todo redo + bktInfo := &data.BucketInfo{CID: addr.Container()} + + obj, err := n.objectHead(ctx, bktInfo, addr.Object()) + if err != nil { + return fmt.Errorf("couldn't head expiration object: %w", err) + } + + header := userHeaders(obj.Attributes()) + objName := header[AttributeParentObject] + bktName := header[AttributeParentBucket] + if objName == "" || bktName == "" { + return fmt.Errorf("couldn't know bucket/object to expire") + } + + p := &DeleteObjectParams{ + BktInfo: bktInfo, + Objects: []*VersionedObject{{Name: objName}}, + } + + res := n.DeleteObjects(ctx, p) + if res[0].Error != nil { + return fmt.Errorf("couldn't delete expired object: %w", res[0].Error) + } + + return n.objectDelete(ctx, bktInfo, addr.Object()) +} + +func (n *layer) ScheduleLifecycle(ctx context.Context, bktInfo *data.BucketInfo, newConf *data.LifecycleConfiguration) error { + if newConf == nil { + return nil + } + + lifecycleID, err := computeLifecycleID(newConf) + if err != nil { + return fmt.Errorf("couldn't compute lifecycle id: %w", err) + } + + // We want to be able to revert partly applied lifecycle if something goes wrong. + if err = n.updateLifecycle(ctx, bktInfo, &data.LifecycleConfig{ + OldConfigurationID: lifecycleID, + }); err != nil { + return err + } + + if err = n.applyLifecycle(ctx, bktInfo, lifecycleID, newConf); err != nil { + return err + } + + return n.updateLifecycle(ctx, bktInfo, &data.LifecycleConfig{ + OldConfigurationID: lifecycleID, + CurrentConfiguration: newConf, + }) +} + +func (n *layer) updateLifecycle(ctx context.Context, bktInfo *data.BucketInfo, lifecycleConfig *data.LifecycleConfig) error { + settings, err := n.GetBucketSettings(ctx, bktInfo) + if err != nil { + return fmt.Errorf("couldn't get bucket settings: %w", err) + } + + settings.LifecycleConfig = lifecycleConfig + sp := &PutSettingsParams{ + BktInfo: bktInfo, + Settings: settings, + } + + if err = n.PutBucketSettings(ctx, sp); err != nil { + return fmt.Errorf("couldn't put bucket settings: %w", err) + } + return nil +} + +func (n *layer) applyLifecycle(ctx context.Context, bktInfo *data.BucketInfo, lifecycleID string, conf *data.LifecycleConfiguration) error { + for _, rule := range conf.Rules { + if rule.Status == "Disabled" { + continue + } + + listParam := allObjectParams{ + Bucket: bktInfo, + Prefix: rule.RealPrefix(), + } + + objects, _, err := n.getLatestObjectsVersions(ctx, listParam) + if err != nil { + return err + } + + if err = n.applyLifecycleToObjects(ctx, bktInfo, lifecycleID, rule, objects); err != nil { + return err + } + } + + return nil +} + +func (n *layer) applyLifecycleToObjects(ctx context.Context, bktInfo *data.BucketInfo, lifecycleID string, rule data.Rule, objects []*data.ObjectInfo) error { + var tags []map[string]string + var err error + if rule.NeedTags() { + tags = make([]map[string]string, len(objects)) + p := &ObjectVersion{ + BktInfo: bktInfo, + } + for i, obj := range objects { + p.ObjectName = obj.Name + p.VersionID = obj.VersionID() + if _, tags[i], err = n.GetObjectTagging(ctx, p); err != nil { + return fmt.Errorf("couldn't get object tags: %w", err) + } + } + } + + for i, obj := range objects { + var objTags map[string]string + if len(tags) != 0 { + objTags = tags[i] + } + if !rule.MatchObject(obj, objTags) { + continue + } + + expObj := &data.ExpirationObject{ + Expiration: rule.Expiration, + RuleID: rule.ID, + LifecycleConfigID: lifecycleID, + } + + if _, err = n.putExpirationObject(ctx, bktInfo, obj, expObj); err != nil { + return fmt.Errorf("couldn't put expiration object: %w", err) + } + } + + return nil +} + +func (n *layer) putLifecycleObjects(ctx context.Context, bktInfo *data.BucketInfo, obj *data.ObjectInfo, lifecycle *data.LifecycleConfig) error { + if lifecycle == nil || lifecycle.CurrentConfiguration == nil { + return nil + } + + for _, rule := range lifecycle.CurrentConfiguration.Rules { + if rule.Status == "Disabled" { + continue + } + + // at this time lifecycle.OldConfigurationID is the same as lifecycle.CurrentConfiguration id + if err := n.applyLifecycleToObjects(ctx, bktInfo, lifecycle.OldConfigurationID, rule, []*data.ObjectInfo{obj}); err != nil { + return err + } + } + + return nil +} + +func computeLifecycleID(conf *data.LifecycleConfiguration) (string, error) { + raw, err := xml.Marshal(conf) + if err != nil { + return "", fmt.Errorf("couldn't marshall new lifecycle configuration: %w", err) + } + + sha := sha256.New() + sha.Write(raw) + sum := sha.Sum(nil) + + id := hex.EncodeToString(sum) + + if id == "" { + return "", fmt.Errorf("computed id is empty") + } + + return id, nil +} diff --git a/api/layer/lifecycle_test.go b/api/layer/lifecycle_test.go new file mode 100644 index 00000000..6b9ffca8 --- /dev/null +++ b/api/layer/lifecycle_test.go @@ -0,0 +1,159 @@ +package layer + +import ( + "testing" + + "github.com/nspcc-dev/neofs-s3-gw/api/data" + "github.com/stretchr/testify/require" +) + +func TestComputeLifecycleID(t *testing.T) { + conf := &data.LifecycleConfiguration{Rules: []data.Rule{ + { + ID: "id", + Status: "Enabled", + }, + }} + + id, err := computeLifecycleID(conf) + require.NoError(t, err) + require.Equal(t, "51ff619dc848622287764fc7c4aec06b7c1a5936c25b8eee48a0dbcb4eeac9f4", id) +} + +func TestRuleMatchObject(t *testing.T) { + prefix, suffix := "prefix", "suffix" + objSizeMin, objSizeMax := int64(512), int64(1024) + + for _, tc := range []struct { + name string + rule data.Rule + obj *data.ObjectInfo + tags map[string]string + expected bool + }{ + { + name: "basic match", + rule: data.Rule{Prefix: &prefix}, + obj: &data.ObjectInfo{Name: prefix + suffix}, + expected: true, + }, + { + name: "basic no match", + rule: data.Rule{Prefix: &prefix}, + obj: &data.ObjectInfo{Name: suffix + prefix}, + expected: false, + }, + { + name: "filter and sizes", + rule: data.Rule{Filter: &data.LifecycleRuleFilter{ + And: &data.LifecycleRuleAndOperator{ + ObjectSizeGreaterThan: &objSizeMin, + ObjectSizeLessThan: &objSizeMax, + }, + }}, + obj: &data.ObjectInfo{Name: suffix, Size: 768}, + expected: true, + }, + { + name: "filter prefix", + rule: data.Rule{Filter: &data.LifecycleRuleFilter{ + Prefix: &prefix, + }}, + obj: &data.ObjectInfo{Name: prefix + suffix}, + expected: true, + }, + { + name: "filter prefix no match", + rule: data.Rule{Filter: &data.LifecycleRuleFilter{ + Prefix: &prefix, + }}, + obj: &data.ObjectInfo{Name: suffix}, + expected: false, + }, + { + name: "filter tags", + rule: data.Rule{Filter: &data.LifecycleRuleFilter{ + Tag: &data.Tag{ + Key: "key", + Value: "val", + }, + }}, + tags: map[string]string{"key": "val"}, + obj: &data.ObjectInfo{}, + expected: true, + }, + { + name: "filter and tags no match", + rule: data.Rule{Filter: &data.LifecycleRuleFilter{ + And: &data.LifecycleRuleAndOperator{ + Tags: []data.Tag{{ + Key: "key", + Value: "val", + }}, + }, + }}, + tags: map[string]string{"key": "val2"}, + obj: &data.ObjectInfo{}, + expected: false, + }, + { + name: "filter size no match", + rule: data.Rule{Filter: &data.LifecycleRuleFilter{ + ObjectSizeGreaterThan: &objSizeMax, + }}, + obj: &data.ObjectInfo{Size: objSizeMin}, + expected: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, tc.rule.MatchObject(tc.obj, tc.tags)) + }) + } +} + +func TestScheduleLifecycle(t *testing.T) { + tc := prepareContext(t) + + obj1 := tc.putObject([]byte("content")) + + date := "2022-03-14T09:59:03Z" + date2 := "2022-03-15T09:59:03Z" + prefix := "prefix" + tc.obj = prefix + obj2 := tc.putObject([]byte("content2")) + + conf := &data.LifecycleConfiguration{ + Rules: []data.Rule{{ + Filter: &data.LifecycleRuleFilter{ + Prefix: &prefix, + }, + Expiration: &data.Expiration{ + Date: &date, + }}, + }, + } + + err := tc.layer.ScheduleLifecycle(tc.ctx, tc.bktInfo, conf) + require.NoError(t, err) + + expObj1, _ := tc.getObject(obj1.ExpirationObject(), "", false) + require.Nil(t, expObj1) + expObj2, _ := tc.getObject(obj2.ExpirationObject(), "", false) + require.NotNil(t, expObj2) + assertExpirationObject(t, expObj2, date) + + conf.Rules[0].Expiration.Date = &date2 + err = tc.layer.ScheduleLifecycle(tc.ctx, tc.bktInfo, conf) + require.NoError(t, err) + + expObj2, _ = tc.getObject(obj2.ExpirationObject(), "", false) + require.NotNil(t, expObj2) + assertExpirationObject(t, expObj2, date2) +} + +func assertExpirationObject(t *testing.T, expObjInfo *data.ObjectInfo, date string) { + require.Equal(t, expObjInfo.Headers[AttributeExpireDate], date) + require.Contains(t, expObjInfo.Headers, AttributeSysTickEpoch) + require.Contains(t, expObjInfo.Headers, AttributeSysTickTopic) + require.Contains(t, expObjInfo.Headers, AttributeLifecycleConfigID) +} diff --git a/api/layer/object.go b/api/layer/object.go index 5e5e7057..71c18a01 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -298,6 +298,13 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend NodeVersion: newVersion, } + // todo filling api.AmzExpiration header + if err = n.putLifecycleObjects(ctx, p.BktInfo, objInfo, bktSettings.LifecycleConfig); err != nil { + return nil, fmt.Errorf("couldn't put expiration system objects: %w", err) + } + + n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID) + n.cache.PutObjectWithName(owner, extendedObjInfo) return extendedObjInfo, nil diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 154ed821..7fdb77ed 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -15,8 +15,7 @@ import ( ) const ( - AttributeComplianceMode = ".s3-compliance-mode" - AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH" + AttributeComplianceMode = ".s3-compliance-mode" ) type PutLockInfoParams struct { @@ -130,6 +129,53 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj return id, err } +func (n *layer) putExpirationObject(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo, expObj *data.ExpirationObject) (oid.ID, error) { + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Filepath: objInfo.ExpirationObject(), + } + + var ( + err error + exp uint64 + expTime time.Time + ) + + if expObj.Expiration.Days != nil { + expTime = objInfo.Created.Add(time.Duration(*expObj.Expiration.Days) * 24 * time.Hour).UTC() + // emulate rounding the resulting time to the next day midnight UTC + toMidnight := 24 - expTime.UTC().Hour() + expTime = expTime.Add(time.Duration(toMidnight) * time.Hour) + } else { + expTime, err = time.Parse(time.RFC3339, *expObj.Expiration.Date) + if err != nil { + return oid.ID{}, fmt.Errorf("couldn't parse expiration date '%s': %w", *expObj.Expiration.Date, err) + } + } + + if expTime.After(time.Now()) { + _, exp, err = n.neoFS.TimeToEpoch(ctx, expTime) + if err != nil { + return oid.ID{}, fmt.Errorf("couldn't compute expiration epoch: %w", err) + } + } + + prm.Attributes = [][2]string{ + {AttributeExpirationEpoch, strconv.FormatUint(exp+4, 10)}, + {AttributeSysTickEpoch, strconv.FormatUint(exp, 10)}, + {AttributeSysTickTopic, ExpireTopic}, + {AttributeParentObject, objInfo.Name}, + {AttributeParentBucket, bktInfo.Name}, + {AttributeExpireDate, expTime.Format(time.RFC3339)}, + {AttributeExpireRuleID, expObj.RuleID}, + {AttributeLifecycleConfigID, expObj.LifecycleConfigID}, + } + + id, _, err := n.objectPutAndHash(ctx, prm, bktInfo) + return id, err +} + func (n *layer) GetLockInfo(ctx context.Context, objVersion *ObjectVersion) (*data.LockInfo, error) { owner := n.Owner(ctx) if lockInfo := n.cache.GetLockInfo(owner, lockObjectKey(objVersion)); lockInfo != nil {