From 5307211398f5376f05f1e5d3801e9dff53901de1 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Fri, 12 Aug 2022 02:48:56 +0400 Subject: [PATCH] [#634] Add CopiesNumber in NeoFS requests Signed-off-by: Angira Kekteeva --- api/handler/api.go | 9 ++++++-- api/handler/cors.go | 5 +++-- api/handler/locking.go | 35 ++++++++++++++++++------------ api/handler/multipart_upload.go | 16 ++++++++------ api/handler/notifications.go | 1 + api/handler/put.go | 13 +++++------ api/layer/cors.go | 9 ++++---- api/layer/layer.go | 22 ++++++++++--------- api/layer/multipart_upload.go | 38 ++++++++++++++++++--------------- api/layer/neofs.go | 3 +++ api/layer/notifications.go | 10 +++++---- api/layer/object.go | 25 +++++++++++++--------- api/layer/system_object.go | 32 +++++++++++++++++---------- cmd/s3-gw/app.go | 14 ++++++++---- cmd/s3-gw/app_settings.go | 4 ++++ internal/neofs/neofs.go | 1 + 16 files changed, 146 insertions(+), 91 deletions(-) diff --git a/api/handler/api.go b/api/handler/api.go index d3aa3f56d..b32f46c3f 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -28,11 +28,16 @@ type ( DefaultMaxAge int NotificatorEnabled bool TLSEnabled bool + CopiesNumber uint32 } ) -// DefaultPolicy is a default policy of placing containers in NeoFS if it's not set at the request. -const DefaultPolicy = "REP 3" +const ( + // DefaultPolicy is a default policy of placing containers in NeoFS if it's not set at the request. + DefaultPolicy = "REP 3" + // DefaultCopiesNumber is a default number of object copies that is enough to consider put successful if it's not set in config. + DefaultCopiesNumber uint32 = 0 +) var _ api.Handler = (*handler)(nil) diff --git a/api/handler/cors.go b/api/handler/cors.go index 12a041690..8a57ac40f 100644 --- a/api/handler/cors.go +++ b/api/handler/cors.go @@ -48,8 +48,9 @@ func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) { } p := &layer.PutCORSParams{ - BktInfo: bktInfo, - Reader: r.Body, + BktInfo: bktInfo, + Reader: r.Body, + CopiesNumber: h.cfg.CopiesNumber, } if err = h.obj.PutBucketCORS(r.Context(), p); err != nil { diff --git a/api/handler/locking.go b/api/handler/locking.go index 2dd817d29..0bcff7afc 100644 --- a/api/handler/locking.go +++ b/api/handler/locking.go @@ -129,18 +129,21 @@ func (h *handler) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Reque return } - p := &layer.ObjectVersion{ - BktInfo: bktInfo, - ObjectName: reqInfo.ObjectName, - VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), - } - lock := &data.ObjectLock{ - LegalHold: &data.LegalHoldLock{ - Enabled: legalHold.Status == legalHoldOn, + p := &layer.PutLockInfoParams{ + ObjVersion: &layer.ObjectVersion{ + BktInfo: bktInfo, + ObjectName: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), }, + NewLock: &data.ObjectLock{ + LegalHold: &data.LegalHoldLock{ + Enabled: legalHold.Status == legalHoldOn, + }, + }, + CopiesNumber: h.cfg.CopiesNumber, } - if err = h.obj.PutLockInfo(r.Context(), p, lock); err != nil { + if err = h.obj.PutLockInfo(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't head put legal hold", reqInfo, err) return } @@ -209,13 +212,17 @@ func (h *handler) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Reque return } - p := &layer.ObjectVersion{ - BktInfo: bktInfo, - ObjectName: reqInfo.ObjectName, - VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + p := &layer.PutLockInfoParams{ + ObjVersion: &layer.ObjectVersion{ + BktInfo: bktInfo, + ObjectName: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), + }, + NewLock: lock, + CopiesNumber: h.cfg.CopiesNumber, } - if err = h.obj.PutLockInfo(r.Context(), p, lock); err != nil { + if err = h.obj.PutLockInfo(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't put legal hold", reqInfo, err) return } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 050d56a0f..89c906e49 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -215,9 +215,10 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) { Bkt: bktInfo, Key: reqInfo.ObjectName, }, - PartNumber: partNumber, - Size: r.ContentLength, - Reader: r.Body, + PartNumber: partNumber, + Size: r.ContentLength, + Reader: r.Body, + CopiesNumber: h.cfg.CopiesNumber, } p.Info.Encryption, err = h.formEncryptionParams(r.Header) @@ -315,10 +316,11 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) { Bkt: bktInfo, Key: reqInfo.ObjectName, }, - SrcObjInfo: srcInfo, - SrcBktInfo: srcBktInfo, - PartNumber: partNumber, - Range: srcRange, + SrcObjInfo: srcInfo, + SrcBktInfo: srcBktInfo, + PartNumber: partNumber, + Range: srcRange, + CopiesNumber: h.cfg.CopiesNumber, } p.Info.Encryption, err = h.formEncryptionParams(r.Header) diff --git a/api/handler/notifications.go b/api/handler/notifications.go index 29529adcb..8a1c01a40 100644 --- a/api/handler/notifications.go +++ b/api/handler/notifications.go @@ -116,6 +116,7 @@ func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Re RequestInfo: reqInfo, BktInfo: bktInfo, Configuration: conf, + CopiesNumber: h.cfg.CopiesNumber, } if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil { diff --git a/api/handler/put.go b/api/handler/put.go index c154e31ff..71eba1b48 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -219,12 +219,13 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { } params := &layer.PutObjectParams{ - BktInfo: bktInfo, - Object: reqInfo.ObjectName, - Reader: r.Body, - Size: r.ContentLength, - Header: metadata, - Encryption: encryption, + BktInfo: bktInfo, + Object: reqInfo.ObjectName, + Reader: r.Body, + Size: r.ContentLength, + Header: metadata, + Encryption: encryption, + CopiesNumber: h.cfg.CopiesNumber, } settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) diff --git a/api/layer/cors.go b/api/layer/cors.go index 895abda65..e819f2d43 100644 --- a/api/layer/cors.go +++ b/api/layer/cors.go @@ -37,10 +37,11 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error { } prm := PrmObjectCreate{ - Container: p.BktInfo.CID, - Creator: p.BktInfo.Owner, - Payload: p.Reader, - Filename: p.BktInfo.CORSObjectName(), + Container: p.BktInfo.CID, + Creator: p.BktInfo.Owner, + Payload: p.Reader, + Filename: p.BktInfo.CORSObjectName(), + CopiesNumber: p.CopiesNumber, } objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo) diff --git a/api/layer/layer.go b/api/layer/layer.go index 3e1098a0d..45820098f 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -110,13 +110,14 @@ type ( // PutObjectParams stores object put request parameters. PutObjectParams struct { - BktInfo *data.BucketInfo - Object string - Size int64 - Reader io.Reader - Header map[string]string - Lock *data.ObjectLock - Encryption encryption.Params + BktInfo *data.BucketInfo + Object string + Size int64 + Reader io.Reader + Header map[string]string + Lock *data.ObjectLock + Encryption encryption.Params + CopiesNumber uint32 } DeleteObjectParams struct { @@ -133,8 +134,9 @@ type ( // PutCORSParams stores PutCORS request parameters. PutCORSParams struct { - BktInfo *data.BucketInfo - Reader io.Reader + BktInfo *data.BucketInfo + Reader io.Reader + CopiesNumber uint32 } // CopyObjectParams stores object copy request parameters. @@ -215,7 +217,7 @@ type ( GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) GetLockInfo(ctx context.Context, obj *ObjectVersion) (*data.LockInfo, error) - PutLockInfo(ctx context.Context, p *ObjectVersion, lock *data.ObjectLock) error + PutLockInfo(ctx context.Context, p *PutLockInfoParams) error GetBucketTagging(ctx context.Context, cnrID cid.ID) (map[string]string, error) PutBucketTagging(ctx context.Context, cnrID cid.ID, tagSet map[string]string) error diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index f2d5cf7d8..4737a298e 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -56,18 +56,20 @@ type ( } UploadPartParams struct { - Info *UploadInfoParams - PartNumber int - Size int64 - Reader io.Reader + Info *UploadInfoParams + PartNumber int + Size int64 + Reader io.Reader + CopiesNumber uint32 } UploadCopyParams struct { - Info *UploadInfoParams - SrcObjInfo *data.ObjectInfo - SrcBktInfo *data.BucketInfo - PartNumber int - Range *RangeParams + Info *UploadInfoParams + SrcObjInfo *data.ObjectInfo + SrcBktInfo *data.BucketInfo + PartNumber int + Range *RangeParams + CopiesNumber uint32 } CompleteMultipartParams struct { @@ -199,10 +201,11 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf bktInfo := p.Info.Bkt prm := PrmObjectCreate{ - Container: bktInfo.CID, - Creator: bktInfo.Owner, - Attributes: make([][2]string, 2), - Payload: p.Reader, + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: make([][2]string, 2), + Payload: p.Reader, + CopiesNumber: p.CopiesNumber, } decSize := p.Size @@ -298,10 +301,11 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. }() params := &UploadPartParams{ - Info: p.Info, - PartNumber: p.PartNumber, - Size: size, - Reader: pr, + Info: p.Info, + PartNumber: p.PartNumber, + Size: size, + Reader: pr, + CopiesNumber: p.CopiesNumber, } return n.uploadPart(ctx, multipartInfo, params) diff --git a/api/layer/neofs.go b/api/layer/neofs.go index 0d1655228..369265a84 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -121,6 +121,9 @@ type PrmObjectCreate struct { // Object payload encapsulated in io.Reader primitive. Payload io.Reader + + // Number of object copies that is enough to consider put successful. + CopiesNumber uint32 } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. diff --git a/api/layer/notifications.go b/api/layer/notifications.go index 0ecaadd6c..9cb0af00e 100644 --- a/api/layer/notifications.go +++ b/api/layer/notifications.go @@ -16,6 +16,7 @@ type PutBucketNotificationConfigurationParams struct { RequestInfo *api.ReqInfo BktInfo *data.BucketInfo Configuration *data.NotificationConfiguration + CopiesNumber uint32 } func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error { @@ -27,10 +28,11 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu sysName := p.BktInfo.NotificationConfigurationObjectName() prm := PrmObjectCreate{ - Container: p.BktInfo.CID, - Creator: p.BktInfo.Owner, - Payload: bytes.NewReader(confXML), - Filename: sysName, + Container: p.BktInfo.CID, + Creator: p.BktInfo.Owner, + Payload: bytes.NewReader(confXML), + Filename: sysName, + CopiesNumber: p.CopiesNumber, } objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo) diff --git a/api/layer/object.go b/api/layer/object.go index 2b294bf4c..0ac1dbceb 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -229,11 +229,12 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } prm := PrmObjectCreate{ - Container: p.BktInfo.CID, - Creator: own, - PayloadSize: uint64(p.Size), - Filename: p.Object, - Payload: r, + Container: p.BktInfo.CID, + Creator: own, + PayloadSize: uint64(p.Size), + Filename: p.Object, + Payload: r, + CopiesNumber: p.CopiesNumber, } prm.Attributes = make([][2]string, 0, len(p.Header)) @@ -254,13 +255,17 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } if p.Lock != nil && (p.Lock.Retention != nil || p.Lock.LegalHold != nil) { - objVersion := &ObjectVersion{ - BktInfo: p.BktInfo, - ObjectName: p.Object, - VersionID: id.EncodeToString(), + prm := &PutLockInfoParams{ + ObjVersion: &ObjectVersion{ + BktInfo: p.BktInfo, + ObjectName: p.Object, + VersionID: id.EncodeToString(), + }, + NewLock: p.Lock, + CopiesNumber: p.CopiesNumber, } - if err = n.PutLockInfo(ctx, objVersion, p.Lock); err != nil { + if err = n.PutLockInfo(ctx, prm); err != nil { return nil, err } } diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 238c4dd80..019397a40 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -19,9 +19,18 @@ const ( AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH" ) -func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newLock *data.ObjectLock) error { - cnrID := objVersion.BktInfo.CID - versionNode, err := n.getNodeVersion(ctx, objVersion) +type PutLockInfoParams struct { + ObjVersion *ObjectVersion + NewLock *data.ObjectLock + CopiesNumber uint32 +} + +func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) error { + var ( + cnrID = p.ObjVersion.BktInfo.CID + newLock = p.NewLock + ) + versionNode, err := n.getNodeVersion(ctx, p.ObjVersion) if err != nil { return err } @@ -56,7 +65,7 @@ func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newL } } lock := &data.ObjectLock{Retention: newLock.Retention} - retentionOID, err := n.putLockObject(ctx, objVersion.BktInfo, versionNode.OID, lock) + retentionOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber) if err != nil { return err } @@ -66,13 +75,13 @@ func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newL if newLock.LegalHold != nil { if newLock.LegalHold.Enabled && !lockInfo.IsLegalHoldSet() { lock := &data.ObjectLock{LegalHold: newLock.LegalHold} - legalHoldOID, err := n.putLockObject(ctx, objVersion.BktInfo, versionNode.OID, lock) + legalHoldOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber) if err != nil { return err } lockInfo.SetLegalHold(legalHoldOID) } else if !newLock.LegalHold.Enabled && lockInfo.IsLegalHoldSet() { - if err = n.objectDelete(ctx, objVersion.BktInfo, lockInfo.LegalHold()); err != nil { + if err = n.objectDelete(ctx, p.ObjVersion.BktInfo, lockInfo.LegalHold()); err != nil { return fmt.Errorf("couldn't delete lock object '%s' to remove legal hold: %w", lockInfo.LegalHold().EncodeToString(), err) } lockInfo.ResetLegalHold() @@ -83,18 +92,19 @@ func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newL return fmt.Errorf("couldn't put lock into tree: %w", err) } - if err = n.systemCache.PutLockInfo(lockObjectKey(objVersion), lockInfo); err != nil { + if err = n.systemCache.PutLockInfo(lockObjectKey(p.ObjVersion), lockInfo); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } return nil } -func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock) (oid.ID, error) { +func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber uint32) (oid.ID, error) { prm := PrmObjectCreate{ - Container: bktInfo.CID, - Creator: bktInfo.Owner, - Locks: []oid.ID{objID}, + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Locks: []oid.ID{objID}, + CopiesNumber: copiesNumber, } var err error diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index fe93bc240..bce3f1455 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -392,10 +392,11 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config { func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config { var ( - cfg handler.Config - err error - policyStr = handler.DefaultPolicy - defaultMaxAge = handler.DefaultMaxAge + cfg handler.Config + err error + policyStr = handler.DefaultPolicy + defaultMaxAge = handler.DefaultMaxAge + setCopiesNumber = handler.DefaultCopiesNumber ) if v.IsSet(cfgDefaultPolicy) { @@ -417,9 +418,14 @@ func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config { } } + if val := v.GetUint32(cfgSetCopiesNumber); val > 0 { + setCopiesNumber = val + } + cfg.DefaultMaxAge = defaultMaxAge cfg.NotificatorEnabled = v.GetBool(cfgEnableNATS) cfg.TLSEnabled = v.IsSet(cfgTLSKeyFile) && v.IsSet(cfgTLSCertFile) + cfg.CopiesNumber = setCopiesNumber return &cfg } diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 29cf8c365..3909e6978 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -112,6 +112,10 @@ const ( // Settings. cmdPProf = "pprof" cmdMetrics = "metrics" + // Configuration of parameters of requests to NeoFS. + // Number of the object copies to consider PUT to NeoFS successful. + cfgSetCopiesNumber = "neofs.set_copies_number" + // envPrefix is an environment variables prefix used for configuration. envPrefix = "S3_GW" ) diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index f74210d75..98d952ce3 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -259,6 +259,7 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi var prmPut pool.PrmObjectPut prmPut.SetHeader(*obj) prmPut.SetPayload(prm.Payload) + prmPut.SetCopiesNumber(prm.CopiesNumber) if prm.BearerToken != nil { prmPut.UseBearer(*prm.BearerToken)