forked from TrueCloudLab/frostfs-s3-gw
[#122] Add getting specific object version
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
f463522f34
commit
3130784ee6
8 changed files with 111 additions and 86 deletions
|
@ -63,6 +63,11 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
h.logAndSendError(w, "could not parse request params", reqInfo, err)
|
||||
return
|
||||
}
|
||||
p := &layer.HeadObjectParams{
|
||||
Bucket: srcBucket,
|
||||
Object: srcObject,
|
||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
||||
}
|
||||
|
||||
if args.MetadataDirective == replaceMetadataDirective {
|
||||
metadata = parseMetadata(r)
|
||||
|
@ -80,7 +85,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), srcBucket, srcObject); err != nil {
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "could not find object", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -100,9 +105,8 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
params := &layer.CopyObjectParams{
|
||||
SrcBucket: srcBucket,
|
||||
SrcObject: inf,
|
||||
DstBucket: reqInfo.BucketName,
|
||||
SrcObject: srcObject,
|
||||
DstObject: reqInfo.ObjectName,
|
||||
SrcSize: inf.Size,
|
||||
Header: metadata,
|
||||
|
|
|
@ -72,6 +72,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) {
|
|||
h.Set(api.LastModified, info.Created.UTC().Format(http.TimeFormat))
|
||||
h.Set(api.ContentLength, strconv.FormatInt(info.Size, 10))
|
||||
h.Set(api.ETag, info.HashSum)
|
||||
h.Set(api.AmzVersionId, info.ID().String())
|
||||
|
||||
for key, val := range info.Headers {
|
||||
h[api.MetadataPrefix+key] = []string{val}
|
||||
|
@ -98,7 +99,13 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil {
|
||||
p := &layer.HeadObjectParams{
|
||||
Bucket: reqInfo.BucketName,
|
||||
Object: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
||||
}
|
||||
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "could not find object", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -118,10 +125,10 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
getParams := &layer.GetObjectParams{
|
||||
Bucket: inf.Bucket,
|
||||
Object: inf.Name,
|
||||
ObjectInfo: inf,
|
||||
Writer: w,
|
||||
Range: params,
|
||||
VersionID: p.VersionID,
|
||||
}
|
||||
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
|
||||
h.logAndSendError(w, "could not get object", reqInfo, err)
|
||||
|
|
|
@ -36,16 +36,22 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil {
|
||||
p := &layer.HeadObjectParams{
|
||||
Bucket: reqInfo.BucketName,
|
||||
Object: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
||||
}
|
||||
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "could not fetch object info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
|
||||
getParams := &layer.GetObjectParams{
|
||||
Bucket: inf.Bucket,
|
||||
Object: inf.Name,
|
||||
ObjectInfo: inf,
|
||||
Writer: buffer,
|
||||
Range: getRangeToDetectContentType(inf.Size),
|
||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
||||
}
|
||||
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
|
||||
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", inf.ID()))
|
||||
|
|
|
@ -39,6 +39,7 @@ func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Requ
|
|||
zap.String("method", reqInfo.API),
|
||||
zap.String("object_name", reqInfo.ObjectName),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, formVersioningConfiguration(settings)); err != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@ package api
|
|||
const (
|
||||
MetadataPrefix = "X-Amz-Meta-"
|
||||
AmzMetadataDirective = "X-Amz-Metadata-Directive"
|
||||
AmzVersionId = "X-Amz-Version-Id"
|
||||
|
||||
LastModified = "Last-Modified"
|
||||
Date = "Date"
|
||||
|
|
|
@ -50,11 +50,20 @@ type (
|
|||
// GetObjectParams stores object get request parameters.
|
||||
GetObjectParams struct {
|
||||
Range *RangeParams
|
||||
Bucket string
|
||||
Object string
|
||||
ObjectInfo *ObjectInfo
|
||||
//Bucket string
|
||||
//Object string
|
||||
Offset int64
|
||||
Length int64
|
||||
Writer io.Writer
|
||||
VersionID string
|
||||
}
|
||||
|
||||
// HeadObjectParams stores object head request parameters.
|
||||
HeadObjectParams struct {
|
||||
Bucket string
|
||||
Object string
|
||||
VersionID string
|
||||
}
|
||||
|
||||
// RangeParams stores range header request parameters.
|
||||
|
@ -85,9 +94,8 @@ type (
|
|||
|
||||
// CopyObjectParams stores object copy request parameters.
|
||||
CopyObjectParams struct {
|
||||
SrcBucket string
|
||||
SrcObject *ObjectInfo
|
||||
DstBucket string
|
||||
SrcObject string
|
||||
DstObject string
|
||||
SrcSize int64
|
||||
Header map[string]string
|
||||
|
@ -140,7 +148,7 @@ type (
|
|||
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
|
||||
|
||||
GetObject(ctx context.Context, p *GetObjectParams) error
|
||||
GetObjectInfo(ctx context.Context, bucketName, objectName string) (*ObjectInfo, error)
|
||||
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error)
|
||||
|
||||
PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error)
|
||||
|
||||
|
@ -265,21 +273,17 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) {
|
|||
|
||||
// GetObject from storage.
|
||||
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
||||
var (
|
||||
err error
|
||||
oid *object.ID
|
||||
bkt *BucketInfo
|
||||
)
|
||||
var err error
|
||||
|
||||
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||
return fmt.Errorf("couldn't find bucket: %s : %w", p.Bucket, err)
|
||||
} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil {
|
||||
return fmt.Errorf("search of the object failed: cid: %s, val: %s : %w", bkt.CID, p.Object, err)
|
||||
}
|
||||
//if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||
// return fmt.Errorf("couldn't find bucket: %s : %w", p.Bucket, err)
|
||||
//} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil {
|
||||
// return fmt.Errorf("search of the object failed: cid: %s, val: %s : %w", bkt.CID, p.Object, err)
|
||||
//}
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(oid)
|
||||
addr.SetContainerID(bkt.CID)
|
||||
addr.SetObjectID(p.ObjectInfo.ID())
|
||||
addr.SetContainerID(p.ObjectInfo.CID())
|
||||
|
||||
params := &getParams{
|
||||
Writer: p.Writer,
|
||||
|
@ -301,7 +305,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
|||
|
||||
if err != nil {
|
||||
n.objCache.Delete(addr)
|
||||
return fmt.Errorf("couldn't get object, cid: %s : %w", bkt.CID, err)
|
||||
return fmt.Errorf("couldn't get object, cid: %s : %w", p.ObjectInfo.CID(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -318,14 +322,18 @@ func (n *layer) checkObject(ctx context.Context, cid *cid.ID, filename string) e
|
|||
}
|
||||
|
||||
// GetObjectInfo returns meta information about the object.
|
||||
func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string) (*ObjectInfo, error) {
|
||||
bkt, err := n.GetBucketInfo(ctx, bucketName)
|
||||
func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*ObjectInfo, error) {
|
||||
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
|
||||
if err != nil {
|
||||
n.log.Error("could not fetch bucket info", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return n.headLastVersion(ctx, bkt, filename)
|
||||
if len(p.VersionID) == 0 {
|
||||
return n.headLastVersion(ctx, bkt, p.Object)
|
||||
}
|
||||
|
||||
return n.headVersion(ctx, bkt, p.Object, p.VersionID)
|
||||
}
|
||||
|
||||
func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*ObjectInfo, error) {
|
||||
|
@ -344,7 +352,7 @@ func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*Ob
|
|||
for get/head requests */
|
||||
meta := n.objCache.Get(addr)
|
||||
if meta == nil {
|
||||
meta, err = n.objectHead(ctx, addr)
|
||||
meta, err = n.objectHead(ctx, bkt.CID, oid)
|
||||
if err != nil {
|
||||
n.log.Error("could not fetch object head", zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -353,6 +361,7 @@ func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*Ob
|
|||
n.log.Error("couldn't cache an object", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return objectInfoFromMeta(bkt, meta, "", ""), nil
|
||||
}
|
||||
|
||||
|
@ -367,8 +376,7 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf
|
|||
|
||||
go func() {
|
||||
err := n.GetObject(ctx, &GetObjectParams{
|
||||
Bucket: p.SrcBucket,
|
||||
Object: p.SrcObject,
|
||||
ObjectInfo: p.SrcObject,
|
||||
Writer: pw,
|
||||
})
|
||||
|
||||
|
@ -477,11 +485,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
|||
res.DeleteMarker = deleted
|
||||
|
||||
for _, id := range ids {
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(id)
|
||||
addr.SetContainerID(bkt.CID)
|
||||
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||
continue
|
||||
|
@ -571,10 +575,7 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(oid)
|
||||
addr.SetContainerID(bucketInfo.CID)
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
meta, err := n.objectHead(ctx, bucketInfo.CID, oid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -612,10 +613,6 @@ func (n *layer) getBucketSettings(ctx context.Context, bktInfo *BucketInfo) (*Bu
|
|||
func objectInfoToBucketSettings(info *ObjectInfo) *BucketSettings {
|
||||
res := &BucketSettings{}
|
||||
|
||||
if info == nil {
|
||||
return res
|
||||
}
|
||||
|
||||
enabled, ok := info.Headers["S3-Settings-Versioning-enabled"]
|
||||
if ok {
|
||||
if parsed, err := strconv.ParseBool(enabled); err == nil {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
|
@ -96,7 +97,10 @@ func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, er
|
|||
}
|
||||
|
||||
// objectHead returns all object's headers.
|
||||
func (n *layer) objectHead(ctx context.Context, address *object.Address) (*object.Object, error) {
|
||||
func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) {
|
||||
address := object.NewAddress()
|
||||
address.SetContainerID(cid)
|
||||
address.SetObjectID(oid)
|
||||
ops := new(client.ObjectHeaderParams).WithAddress(address).WithAllFields()
|
||||
return n.pool.GetObjectHeader(ctx, ops, n.BearerOpt(ctx))
|
||||
}
|
||||
|
@ -213,10 +217,7 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(oid)
|
||||
addr.SetContainerID(bkt.CID)
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
meta, err := n.objectHead(ctx, bkt.CID, oid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -265,10 +266,7 @@ func (n *layer) headLastVersion(ctx context.Context, bkt *BucketInfo, objectName
|
|||
|
||||
infos := make([]*object.Object, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
addr := object.NewAddress()
|
||||
addr.SetContainerID(bkt.CID)
|
||||
addr.SetObjectID(id)
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head object",
|
||||
zap.Stringer("object id", id),
|
||||
|
@ -284,23 +282,31 @@ func (n *layer) headLastVersion(ctx context.Context, bkt *BucketInfo, objectName
|
|||
})
|
||||
|
||||
return objectInfoFromMeta(bkt, infos[len(infos)-1], "", ""), nil
|
||||
//versionsToDeleteStr, ok := lastVersionInfo.Headers[versionsAddAttr]
|
||||
//versionsDeletedStr := lastVersionInfo.Headers[versionsDelAttr]
|
||||
//idsToDeleteArr := []*object.ID{lastVersionInfo.ID()}
|
||||
//if ok {
|
||||
// // for versioning mode only
|
||||
// idsToDelete := strings.Split(versionsToDeleteStr, ",")
|
||||
// for _, idStr := range idsToDelete {
|
||||
// oid := object.NewID()
|
||||
// if err = oid.Parse(idStr); err != nil {
|
||||
// n.log.Warn("couldn't parse object id versions list",
|
||||
// zap.String("versions id", versionsToDeleteStr),
|
||||
// zap.Error(err))
|
||||
// break
|
||||
// }
|
||||
// idsToDeleteArr = append(idsToDeleteArr, oid)
|
||||
// }
|
||||
//}
|
||||
}
|
||||
|
||||
func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, objectName, versionID string) (*ObjectInfo, error) {
|
||||
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil, api.GetAPIError(api.ErrNoSuchVersion)
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
if id.String() == versionID {
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, api.GetAPIError(api.ErrNoSuchVersion)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return objectInfoFromMeta(bkt, meta, "", ""), nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, api.GetAPIError(api.ErrNoSuchVersion)
|
||||
}
|
||||
|
||||
// objectDelete puts tombstone object into neofs.
|
||||
|
@ -399,11 +405,7 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam
|
|||
objects := make([]*ObjectInfo, 0, len(ids))
|
||||
|
||||
for _, id := range ids {
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(id)
|
||||
addr.SetContainerID(p.Bucket.CID)
|
||||
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
meta, err := n.objectHead(ctx, p.Bucket.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||
continue
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
|
@ -21,6 +23,7 @@ type (
|
|||
isDir bool
|
||||
|
||||
Bucket string
|
||||
bucketID *cid.ID
|
||||
Name string
|
||||
Size int64
|
||||
ContentType string
|
||||
|
@ -137,6 +140,7 @@ func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter
|
|||
isDir: isDir,
|
||||
|
||||
Bucket: bkt.Name,
|
||||
bucketID: bkt.CID,
|
||||
Name: filename,
|
||||
Created: creation,
|
||||
ContentType: mimeType,
|
||||
|
@ -174,6 +178,9 @@ func NameFromString(name string) (string, string) {
|
|||
// ID returns object ID from ObjectInfo.
|
||||
func (o *ObjectInfo) ID() *object.ID { return o.id }
|
||||
|
||||
// CID returns bucket ID from ObjectInfo.
|
||||
func (o *ObjectInfo) CID() *cid.ID { return o.bucketID }
|
||||
|
||||
// IsDir allows to check if object is a directory.
|
||||
func (o *ObjectInfo) IsDir() bool { return o.isDir }
|
||||
|
||||
|
|
Loading…
Reference in a new issue