forked from TrueCloudLab/frostfs-s3-gw
[#634] Add CopiesNumber in NeoFS requests
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
7a6d562c69
commit
5307211398
16 changed files with 146 additions and 91 deletions
|
@ -28,11 +28,16 @@ type (
|
|||
DefaultMaxAge int
|
||||
NotificatorEnabled bool
|
||||
TLSEnabled bool
|
||||
CopiesNumber uint32
|
||||
}
|
||||
)
|
||||
|
||||
// DefaultPolicy is a default policy of placing containers in NeoFS if it's not set at the request.
|
||||
const DefaultPolicy = "REP 3"
|
||||
const (
|
||||
// DefaultPolicy is a default policy of placing containers in NeoFS if it's not set at the request.
|
||||
DefaultPolicy = "REP 3"
|
||||
// DefaultCopiesNumber is a default number of object copies that is enough to consider put successful if it's not set in config.
|
||||
DefaultCopiesNumber uint32 = 0
|
||||
)
|
||||
|
||||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
p := &layer.PutCORSParams{
|
||||
BktInfo: bktInfo,
|
||||
Reader: r.Body,
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketCORS(r.Context(), p); err != nil {
|
||||
|
|
|
@ -129,18 +129,21 @@ func (h *handler) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Reque
|
|||
return
|
||||
}
|
||||
|
||||
p := &layer.ObjectVersion{
|
||||
p := &layer.PutLockInfoParams{
|
||||
ObjVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
}
|
||||
lock := &data.ObjectLock{
|
||||
},
|
||||
NewLock: &data.ObjectLock{
|
||||
LegalHold: &data.LegalHoldLock{
|
||||
Enabled: legalHold.Status == legalHoldOn,
|
||||
},
|
||||
},
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
if err = h.obj.PutLockInfo(r.Context(), p, lock); err != nil {
|
||||
if err = h.obj.PutLockInfo(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "couldn't head put legal hold", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -209,13 +212,17 @@ func (h *handler) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Reque
|
|||
return
|
||||
}
|
||||
|
||||
p := &layer.ObjectVersion{
|
||||
p := &layer.PutLockInfoParams{
|
||||
ObjVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
},
|
||||
NewLock: lock,
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
if err = h.obj.PutLockInfo(r.Context(), p, lock); err != nil {
|
||||
if err = h.obj.PutLockInfo(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "couldn't put legal hold", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -218,6 +218,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
|||
PartNumber: partNumber,
|
||||
Size: r.ContentLength,
|
||||
Reader: r.Body,
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = h.formEncryptionParams(r.Header)
|
||||
|
@ -319,6 +320,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
SrcBktInfo: srcBktInfo,
|
||||
PartNumber: partNumber,
|
||||
Range: srcRange,
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = h.formEncryptionParams(r.Header)
|
||||
|
|
|
@ -116,6 +116,7 @@ func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Re
|
|||
RequestInfo: reqInfo,
|
||||
BktInfo: bktInfo,
|
||||
Configuration: conf,
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketNotificationConfiguration(r.Context(), p); err != nil {
|
||||
|
|
|
@ -225,6 +225,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Size: r.ContentLength,
|
||||
Header: metadata,
|
||||
Encryption: encryption,
|
||||
CopiesNumber: h.cfg.CopiesNumber,
|
||||
}
|
||||
|
||||
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
|
|
|
@ -41,6 +41,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
Creator: p.BktInfo.Owner,
|
||||
Payload: p.Reader,
|
||||
Filename: p.BktInfo.CORSObjectName(),
|
||||
CopiesNumber: p.CopiesNumber,
|
||||
}
|
||||
|
||||
objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||
|
|
|
@ -117,6 +117,7 @@ type (
|
|||
Header map[string]string
|
||||
Lock *data.ObjectLock
|
||||
Encryption encryption.Params
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
DeleteObjectParams struct {
|
||||
|
@ -135,6 +136,7 @@ type (
|
|||
PutCORSParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
Reader io.Reader
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
// CopyObjectParams stores object copy request parameters.
|
||||
|
@ -215,7 +217,7 @@ type (
|
|||
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
GetLockInfo(ctx context.Context, obj *ObjectVersion) (*data.LockInfo, error)
|
||||
PutLockInfo(ctx context.Context, p *ObjectVersion, lock *data.ObjectLock) error
|
||||
PutLockInfo(ctx context.Context, p *PutLockInfoParams) error
|
||||
|
||||
GetBucketTagging(ctx context.Context, cnrID cid.ID) (map[string]string, error)
|
||||
PutBucketTagging(ctx context.Context, cnrID cid.ID, tagSet map[string]string) error
|
||||
|
|
|
@ -60,6 +60,7 @@ type (
|
|||
PartNumber int
|
||||
Size int64
|
||||
Reader io.Reader
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
UploadCopyParams struct {
|
||||
|
@ -68,6 +69,7 @@ type (
|
|||
SrcBktInfo *data.BucketInfo
|
||||
PartNumber int
|
||||
Range *RangeParams
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
CompleteMultipartParams struct {
|
||||
|
@ -203,6 +205,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
|||
Creator: bktInfo.Owner,
|
||||
Attributes: make([][2]string, 2),
|
||||
Payload: p.Reader,
|
||||
CopiesNumber: p.CopiesNumber,
|
||||
}
|
||||
|
||||
decSize := p.Size
|
||||
|
@ -302,6 +305,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
|||
PartNumber: p.PartNumber,
|
||||
Size: size,
|
||||
Reader: pr,
|
||||
CopiesNumber: p.CopiesNumber,
|
||||
}
|
||||
|
||||
return n.uploadPart(ctx, multipartInfo, params)
|
||||
|
|
|
@ -121,6 +121,9 @@ type PrmObjectCreate struct {
|
|||
|
||||
// Object payload encapsulated in io.Reader primitive.
|
||||
Payload io.Reader
|
||||
|
||||
// Number of object copies that is enough to consider put successful.
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
// PrmObjectDelete groups parameters of NeoFS.DeleteObject operation.
|
||||
|
|
|
@ -16,6 +16,7 @@ type PutBucketNotificationConfigurationParams struct {
|
|||
RequestInfo *api.ReqInfo
|
||||
BktInfo *data.BucketInfo
|
||||
Configuration *data.NotificationConfiguration
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error {
|
||||
|
@ -31,6 +32,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
|||
Creator: p.BktInfo.Owner,
|
||||
Payload: bytes.NewReader(confXML),
|
||||
Filename: sysName,
|
||||
CopiesNumber: p.CopiesNumber,
|
||||
}
|
||||
|
||||
objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||
|
|
|
@ -234,6 +234,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
PayloadSize: uint64(p.Size),
|
||||
Filename: p.Object,
|
||||
Payload: r,
|
||||
CopiesNumber: p.CopiesNumber,
|
||||
}
|
||||
|
||||
prm.Attributes = make([][2]string, 0, len(p.Header))
|
||||
|
@ -254,13 +255,17 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
}
|
||||
|
||||
if p.Lock != nil && (p.Lock.Retention != nil || p.Lock.LegalHold != nil) {
|
||||
objVersion := &ObjectVersion{
|
||||
prm := &PutLockInfoParams{
|
||||
ObjVersion: &ObjectVersion{
|
||||
BktInfo: p.BktInfo,
|
||||
ObjectName: p.Object,
|
||||
VersionID: id.EncodeToString(),
|
||||
},
|
||||
NewLock: p.Lock,
|
||||
CopiesNumber: p.CopiesNumber,
|
||||
}
|
||||
|
||||
if err = n.PutLockInfo(ctx, objVersion, p.Lock); err != nil {
|
||||
if err = n.PutLockInfo(ctx, prm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,9 +19,18 @@ const (
|
|||
AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH"
|
||||
)
|
||||
|
||||
func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newLock *data.ObjectLock) error {
|
||||
cnrID := objVersion.BktInfo.CID
|
||||
versionNode, err := n.getNodeVersion(ctx, objVersion)
|
||||
type PutLockInfoParams struct {
|
||||
ObjVersion *ObjectVersion
|
||||
NewLock *data.ObjectLock
|
||||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) error {
|
||||
var (
|
||||
cnrID = p.ObjVersion.BktInfo.CID
|
||||
newLock = p.NewLock
|
||||
)
|
||||
versionNode, err := n.getNodeVersion(ctx, p.ObjVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -56,7 +65,7 @@ func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newL
|
|||
}
|
||||
}
|
||||
lock := &data.ObjectLock{Retention: newLock.Retention}
|
||||
retentionOID, err := n.putLockObject(ctx, objVersion.BktInfo, versionNode.OID, lock)
|
||||
retentionOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -66,13 +75,13 @@ func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newL
|
|||
if newLock.LegalHold != nil {
|
||||
if newLock.LegalHold.Enabled && !lockInfo.IsLegalHoldSet() {
|
||||
lock := &data.ObjectLock{LegalHold: newLock.LegalHold}
|
||||
legalHoldOID, err := n.putLockObject(ctx, objVersion.BktInfo, versionNode.OID, lock)
|
||||
legalHoldOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lockInfo.SetLegalHold(legalHoldOID)
|
||||
} else if !newLock.LegalHold.Enabled && lockInfo.IsLegalHoldSet() {
|
||||
if err = n.objectDelete(ctx, objVersion.BktInfo, lockInfo.LegalHold()); err != nil {
|
||||
if err = n.objectDelete(ctx, p.ObjVersion.BktInfo, lockInfo.LegalHold()); err != nil {
|
||||
return fmt.Errorf("couldn't delete lock object '%s' to remove legal hold: %w", lockInfo.LegalHold().EncodeToString(), err)
|
||||
}
|
||||
lockInfo.ResetLegalHold()
|
||||
|
@ -83,18 +92,19 @@ func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newL
|
|||
return fmt.Errorf("couldn't put lock into tree: %w", err)
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutLockInfo(lockObjectKey(objVersion), lockInfo); err != nil {
|
||||
if err = n.systemCache.PutLockInfo(lockObjectKey(p.ObjVersion), lockInfo); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock) (oid.ID, error) {
|
||||
func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber uint32) (oid.ID, error) {
|
||||
prm := PrmObjectCreate{
|
||||
Container: bktInfo.CID,
|
||||
Creator: bktInfo.Owner,
|
||||
Locks: []oid.ID{objID},
|
||||
CopiesNumber: copiesNumber,
|
||||
}
|
||||
|
||||
var err error
|
||||
|
|
|
@ -396,6 +396,7 @@ func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config {
|
|||
err error
|
||||
policyStr = handler.DefaultPolicy
|
||||
defaultMaxAge = handler.DefaultMaxAge
|
||||
setCopiesNumber = handler.DefaultCopiesNumber
|
||||
)
|
||||
|
||||
if v.IsSet(cfgDefaultPolicy) {
|
||||
|
@ -417,9 +418,14 @@ func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config {
|
|||
}
|
||||
}
|
||||
|
||||
if val := v.GetUint32(cfgSetCopiesNumber); val > 0 {
|
||||
setCopiesNumber = val
|
||||
}
|
||||
|
||||
cfg.DefaultMaxAge = defaultMaxAge
|
||||
cfg.NotificatorEnabled = v.GetBool(cfgEnableNATS)
|
||||
cfg.TLSEnabled = v.IsSet(cfgTLSKeyFile) && v.IsSet(cfgTLSCertFile)
|
||||
cfg.CopiesNumber = setCopiesNumber
|
||||
|
||||
return &cfg
|
||||
}
|
||||
|
|
|
@ -112,6 +112,10 @@ const ( // Settings.
|
|||
cmdPProf = "pprof"
|
||||
cmdMetrics = "metrics"
|
||||
|
||||
// Configuration of parameters of requests to NeoFS.
|
||||
// Number of the object copies to consider PUT to NeoFS successful.
|
||||
cfgSetCopiesNumber = "neofs.set_copies_number"
|
||||
|
||||
// envPrefix is an environment variables prefix used for configuration.
|
||||
envPrefix = "S3_GW"
|
||||
)
|
||||
|
|
|
@ -259,6 +259,7 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
|
|||
var prmPut pool.PrmObjectPut
|
||||
prmPut.SetHeader(*obj)
|
||||
prmPut.SetPayload(prm.Payload)
|
||||
prmPut.SetCopiesNumber(prm.CopiesNumber)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmPut.UseBearer(*prm.BearerToken)
|
||||
|
|
Loading…
Reference in a new issue