forked from TrueCloudLab/frostfs-s3-gw
[#122] Add enabling versioning
New handlers: PutBucketVersioning, GetBucketVersioning Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
d81a3d7b45
commit
c50a16a5e3
5 changed files with 163 additions and 35 deletions
|
@ -9,13 +9,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
)
|
||||
|
||||
// VersioningConfiguration contains VersioningConfiguration XML representation.
|
||||
type VersioningConfiguration struct {
|
||||
XMLName xml.Name `xml:"VersioningConfiguration"`
|
||||
Text string `xml:",chardata"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
}
|
||||
|
||||
// ListMultipartUploadsResult contains ListMultipartUploadsResult XML representation.
|
||||
type ListMultipartUploadsResult struct {
|
||||
XMLName xml.Name `xml:"ListMultipartUploadsResult"`
|
||||
|
@ -62,20 +55,6 @@ func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// GetBucketVersioningHandler implements bucket versioning getter handler.
|
||||
func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
reqInfo = api.GetReqInfo(r.Context())
|
||||
res = new(VersioningConfiguration)
|
||||
)
|
||||
|
||||
res.Xmlns = "http://s3.amazonaws.com/doc/2006-03-01/"
|
||||
|
||||
if err := api.EncodeToResponse(w, res); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ListMultipartUploadsHandler implements multipart uploads listing handler.
|
||||
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
|
|
|
@ -23,10 +23,6 @@ func (h *handler) PutBucketTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported))
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported))
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not supported", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotSupported))
|
||||
}
|
||||
|
|
|
@ -164,6 +164,13 @@ type ListObjectsVersionsResponse struct {
|
|||
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
|
||||
}
|
||||
|
||||
// VersioningConfiguration contains VersioningConfiguration XML representation.
|
||||
type VersioningConfiguration struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ VersioningConfiguration"`
|
||||
Status string `xml:"Status"`
|
||||
MfaDelete string `xml:"MfaDelete,omitempty"`
|
||||
}
|
||||
|
||||
// MarshalXML - StringMap marshals into XML.
|
||||
func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
|
||||
tokens := []xml.Token{start}
|
||||
|
|
64
api/handler/versioning.go
Normal file
64
api/handler/versioning.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
configuration := new(VersioningConfiguration)
|
||||
if err := xml.NewDecoder(r.Body).Decode(configuration); err != nil {
|
||||
h.logAndSendError(w, "couldn't decode versioning configuration", reqInfo, api.GetAPIError(api.ErrIllegalVersioningConfigurationException))
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.PutVersioningParams{
|
||||
Bucket: reqInfo.BucketName,
|
||||
VersioningEnabled: configuration.Status == "Enabled",
|
||||
}
|
||||
|
||||
if _, err := h.obj.PutBucketVersioning(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "couldn't put update versioning settings", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
// GetBucketVersioningHandler implements bucket versioning getter handler.
|
||||
func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
objInfo, err := h.obj.GetBucketVersioning(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.log.Warn("couldn't get version settings object: default version settings will be used",
|
||||
zap.String("request_id", reqInfo.RequestID),
|
||||
zap.String("method", reqInfo.API),
|
||||
zap.String("object_name", reqInfo.ObjectName),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, formVersioningConfiguration(objInfo)); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func formVersioningConfiguration(inf *layer.ObjectInfo) *VersioningConfiguration {
|
||||
res := &VersioningConfiguration{Status: "Suspended"}
|
||||
|
||||
if inf == nil {
|
||||
return res
|
||||
}
|
||||
|
||||
enabled, ok := inf.Headers["S3-Settings-Versioning-enabled"]
|
||||
if ok {
|
||||
if parsed, err := strconv.ParseBool(enabled); err == nil && parsed {
|
||||
res.Status = "Enabled"
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
|
||||
|
@ -71,6 +72,12 @@ type (
|
|||
Header map[string]string
|
||||
}
|
||||
|
||||
// PutVersioningParams stores object copy request parameters.
|
||||
PutVersioningParams struct {
|
||||
Bucket string
|
||||
VersioningEnabled bool
|
||||
}
|
||||
|
||||
// CopyObjectParams stores object copy request parameters.
|
||||
CopyObjectParams struct {
|
||||
SrcBucket string
|
||||
|
@ -117,6 +124,9 @@ type (
|
|||
Client interface {
|
||||
NeoFS
|
||||
|
||||
PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error)
|
||||
GetBucketVersioning(ctx context.Context, name string) (*ObjectInfo, error)
|
||||
|
||||
ListBuckets(ctx context.Context) ([]*BucketInfo, error)
|
||||
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
|
||||
GetBucketACL(ctx context.Context, name string) (*BucketACL, error)
|
||||
|
@ -142,6 +152,7 @@ type (
|
|||
|
||||
const (
|
||||
unversionedObjectVersionID = "null"
|
||||
bktVersionSettingsObject = ".s3-versioning-settings"
|
||||
)
|
||||
|
||||
// NewLayer creates instance of layer. It checks credentials
|
||||
|
@ -303,17 +314,18 @@ func (n *layer) checkObject(ctx context.Context, cid *cid.ID, filename string) e
|
|||
|
||||
// GetObjectInfo returns meta information about the object.
|
||||
func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string) (*ObjectInfo, error) {
|
||||
var (
|
||||
err error
|
||||
oid *object.ID
|
||||
bkt *BucketInfo
|
||||
meta *object.Object
|
||||
)
|
||||
|
||||
if bkt, err = n.GetBucketInfo(ctx, bucketName); err != nil {
|
||||
bkt, err := n.GetBucketInfo(ctx, bucketName)
|
||||
if err != nil {
|
||||
n.log.Error("could not fetch bucket info", zap.Error(err))
|
||||
return nil, err
|
||||
} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil {
|
||||
}
|
||||
|
||||
return n.getObjectInfo(ctx, bkt, filename)
|
||||
}
|
||||
|
||||
func (n *layer) getObjectInfo(ctx context.Context, bkt *BucketInfo, objectName string) (*ObjectInfo, error) {
|
||||
oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, val: objectName})
|
||||
if err != nil {
|
||||
n.log.Error("could not find object id", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -325,7 +337,7 @@ func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string)
|
|||
/* todo: now we get an address via request to NeoFS and try to find the object with the address in cache
|
||||
but it will be resolved after implementation of local cache with nicenames and address of objects
|
||||
for get/head requests */
|
||||
meta = n.objCache.Get(addr)
|
||||
meta := n.objCache.Get(addr)
|
||||
if meta == nil {
|
||||
meta, err = n.objectHead(ctx, addr)
|
||||
if err != nil {
|
||||
|
@ -508,3 +520,73 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
|||
}
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) {
|
||||
bucketInfo, err := n.GetBucketInfo(ctx, p.Bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objectInfo, err := n.getObjectInfo(ctx, bucketInfo, bktVersionSettingsObject)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't get bucket version settings object, new one will be created",
|
||||
zap.String("bucket_name", bucketInfo.Name),
|
||||
zap.Stringer("cid", bucketInfo.CID),
|
||||
zap.String("object_name", bktVersionSettingsObject),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
attributes := make([]*object.Attribute, 0, 3)
|
||||
|
||||
filename := object.NewAttribute()
|
||||
filename.SetKey(object.AttributeFileName)
|
||||
filename.SetValue(bktVersionSettingsObject)
|
||||
|
||||
createdAt := object.NewAttribute()
|
||||
createdAt.SetKey(object.AttributeTimestamp)
|
||||
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
|
||||
|
||||
versioningIgnore := object.NewAttribute()
|
||||
versioningIgnore.SetKey("S3-Versions-ignore")
|
||||
versioningIgnore.SetValue(strconv.FormatBool(true))
|
||||
|
||||
settingsVersioningEnabled := object.NewAttribute()
|
||||
settingsVersioningEnabled.SetKey("S3-Settings-Versioning-enabled")
|
||||
settingsVersioningEnabled.SetValue(strconv.FormatBool(p.VersioningEnabled))
|
||||
|
||||
attributes = append(attributes, filename, createdAt, versioningIgnore, settingsVersioningEnabled)
|
||||
|
||||
raw := object.NewRaw()
|
||||
raw.SetOwnerID(bucketInfo.Owner)
|
||||
raw.SetContainerID(bucketInfo.CID)
|
||||
raw.SetAttributes(attributes...)
|
||||
|
||||
ops := new(client.PutObjectParams).WithObject(raw.Object())
|
||||
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(oid)
|
||||
addr.SetContainerID(bucketInfo.CID)
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if objectInfo != nil {
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(objectInfo.ID())
|
||||
addr.SetContainerID(bucketInfo.CID)
|
||||
if err = n.objectDelete(ctx, addr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return objectInfoFromMeta(bucketInfo, meta, "", ""), nil
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*ObjectInfo, error) {
|
||||
return n.GetObjectInfo(ctx, bucketName, bktVersionSettingsObject)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue