[#195] Support enabling object locking for bucket

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-02-25 12:06:40 +03:00 committed by Angira Kekteeva
parent 40b6365afb
commit e98c663bd6
5 changed files with 41 additions and 17 deletions

View file

@ -25,6 +25,7 @@ type (
Created time.Time Created time.Time
BasicACL uint32 BasicACL uint32
LocationConstraint string LocationConstraint string
ObjectLockEnabled bool
} }
// ObjectInfo holds S3 object data. // ObjectInfo holds S3 object data.

View file

@ -565,18 +565,36 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
p.Policy = h.cfg.DefaultPolicy p.Policy = h.cfg.DefaultPolicy
} }
p.ObjectLockEnabled = isLockEnabled(r.Header)
cid, err := h.obj.CreateBucket(r.Context(), &p) cid, err := h.obj.CreateBucket(r.Context(), &p)
if err != nil { if err != nil {
h.logAndSendError(w, "could not create bucket", reqInfo, err) h.logAndSendError(w, "could not create bucket", reqInfo, err)
return return
} }
if p.ObjectLockEnabled {
vp := &layer.PutVersioningParams{
Bucket: reqInfo.BucketName,
Settings: &layer.BucketSettings{VersioningEnabled: true},
}
if _, err = h.obj.PutBucketVersioning(r.Context(), vp); err != nil {
h.log.Error("couldn't enable bucket versioning", zap.Stringer("container_id", cid), zap.Error(err))
}
}
h.log.Info("bucket is created", h.log.Info("bucket is created",
zap.String("container_id", cid.String())) zap.String("container_id", cid.String()))
api.WriteSuccessResponseHeadersOnly(w) api.WriteSuccessResponseHeadersOnly(w)
} }
func isLockEnabled(header http.Header) bool {
lockEnabledStr := header.Get(api.AmzBucketObjectLockEnabled)
lockEnabled, _ := strconv.ParseBool(lockEnabledStr)
return lockEnabled
}
func checkBucketName(bucketName string) error { func checkBucketName(bucketName string) error {
if len(bucketName) < 3 || len(bucketName) > 63 { if len(bucketName) < 3 || len(bucketName) > 63 {
return errors.GetAPIError(errors.ErrInvalidBucketName) return errors.GetAPIError(errors.ErrInvalidBucketName)

View file

@ -46,6 +46,7 @@ const (
AmzGrantWrite = "X-Amz-Grant-Write" AmzGrantWrite = "X-Amz-Grant-Write"
AmzExpectedBucketOwner = "X-Amz-Expected-Bucket-Owner" AmzExpectedBucketOwner = "X-Amz-Expected-Bucket-Owner"
AmzSourceExpectedBucketOwner = "X-Amz-Source-Expected-Bucket-Owner" AmzSourceExpectedBucketOwner = "X-Amz-Source-Expected-Bucket-Owner"
AmzBucketObjectLockEnabled = "X-Amz-Bucket-Object-Lock-Enabled"
ContainerID = "X-Container-Id" ContainerID = "X-Container-Id"

View file

@ -27,13 +27,17 @@ type (
} }
) )
const locationConstraintAttr = ".s3-location-constraint" const (
attributeLocationConstraint = ".s3-location-constraint"
attributeLockEnabled = "LockEnabled"
)
func (n *layer) containerInfo(ctx context.Context, idCnr *cid.ID) (*data.BucketInfo, error) { func (n *layer) containerInfo(ctx context.Context, idCnr *cid.ID) (*data.BucketInfo, error) {
var ( var (
err error err error
res *container.Container res *container.Container
rid = api.GetRequestID(ctx) rid = api.GetRequestID(ctx)
log = n.log.With(zap.Stringer("cid", idCnr), zap.String("request_id", rid))
info = &data.BucketInfo{ info = &data.BucketInfo{
CID: idCnr, CID: idCnr,
@ -42,10 +46,7 @@ func (n *layer) containerInfo(ctx context.Context, idCnr *cid.ID) (*data.BucketI
) )
res, err = n.neoFS.Container(ctx, *idCnr) res, err = n.neoFS.Container(ctx, *idCnr)
if err != nil { if err != nil {
n.log.Error("could not fetch container", log.Error("could not fetch container", zap.Error(err))
zap.Stringer("cid", idCnr),
zap.String("request_id", rid),
zap.Error(err))
if strings.Contains(err.Error(), "container not found") { if strings.Contains(err.Error(), "container not found") {
return nil, errors.GetAPIError(errors.ErrNoSuchBucket) return nil, errors.GetAPIError(errors.ErrNoSuchBucket)
@ -63,26 +64,27 @@ func (n *layer) containerInfo(ctx context.Context, idCnr *cid.ID) (*data.BucketI
case container.AttributeTimestamp: case container.AttributeTimestamp:
unix, err := strconv.ParseInt(attr.Value(), 10, 64) unix, err := strconv.ParseInt(attr.Value(), 10, 64)
if err != nil { if err != nil {
n.log.Error("could not parse container creation time", log.Error("could not parse container creation time",
zap.Stringer("cid", idCnr), zap.String("created_at", val), zap.Error(err))
zap.String("request_id", rid),
zap.String("created_at", val),
zap.Error(err))
continue continue
} }
info.Created = time.Unix(unix, 0) info.Created = time.Unix(unix, 0)
case locationConstraintAttr: case attributeLocationConstraint:
info.LocationConstraint = val info.LocationConstraint = val
case attributeLockEnabled:
info.ObjectLockEnabled, err = strconv.ParseBool(val)
if err != nil {
log.Error("could not parse container object lock enabled attribute",
zap.String("lock_enabled", val), zap.Error(err))
}
} }
} }
if err := n.bucketCache.Put(info); err != nil { if err = n.bucketCache.Put(info); err != nil {
n.log.Warn("could not put bucket info into cache", log.Warn("could not put bucket info into cache",
zap.Stringer("cid", idCnr), zap.String("bucket_name", info.Name), zap.Error(err))
zap.String("bucket_name", info.Name),
zap.Error(err))
} }
return info, nil return info, nil
@ -133,9 +135,10 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*ci
if p.LocationConstraint != "" { if p.LocationConstraint != "" {
locConstAttr = container.NewAttribute() locConstAttr = container.NewAttribute()
locConstAttr.SetKey(locationConstraintAttr) locConstAttr.SetKey(attributeLocationConstraint)
locConstAttr.SetValue(p.LocationConstraint) locConstAttr.SetValue(p.LocationConstraint)
} }
//todo add lock enabled attr
if bktInfo.CID, err = n.neoFS.CreateContainer(ctx, PrmContainerCreate{ if bktInfo.CID, err = n.neoFS.CreateContainer(ctx, PrmContainerCreate{
Creator: *bktInfo.Owner, Creator: *bktInfo.Owner,

View file

@ -336,6 +336,7 @@ type (
EACL *eacl.Table EACL *eacl.Table
SessionToken *session.Token SessionToken *session.Token
LocationConstraint string LocationConstraint string
ObjectLockEnabled bool
} }
// PutBucketACLParams stores put bucket acl request parameters. // PutBucketACLParams stores put bucket acl request parameters.
PutBucketACLParams struct { PutBucketACLParams struct {