forked from TrueCloudLab/frostfs-s3-gw
[#25] Refactoring and make fixes
closes #25 closes #33 Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
4d605d1113
commit
256850b8fe
10 changed files with 125 additions and 64 deletions
|
@ -49,6 +49,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Description: "",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}, r.URL)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -78,6 +79,8 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusInternalServerError,
|
||||
}, r.URL)
|
||||
|
||||
return
|
||||
} else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: inf.Created.Format(time.RFC3339)}); err != nil {
|
||||
h.log.Error("something went wrong",
|
||||
zap.String("request_id", rid),
|
||||
|
@ -92,5 +95,12 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusInternalServerError,
|
||||
}, r.URL)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
h.log.Info("object is copied",
|
||||
zap.String("bucket", inf.Bucket),
|
||||
zap.String("object", inf.Name),
|
||||
zap.Stringer("object_id", inf.ID()))
|
||||
}
|
||||
|
|
|
@ -153,5 +153,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusInternalServerError,
|
||||
}, r.URL)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,12 +21,6 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
rid = api.GetRequestID(r.Context())
|
||||
)
|
||||
|
||||
params := &layer.GetObjectParams{
|
||||
Bucket: bkt,
|
||||
Object: obj,
|
||||
Writer: w,
|
||||
}
|
||||
|
||||
if inf, err = h.obj.GetObjectInfo(r.Context(), bkt, obj); err != nil {
|
||||
h.log.Error("could not find object",
|
||||
zap.String("request_id", rid),
|
||||
|
@ -43,7 +37,13 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
params.Length = inf.Size
|
||||
params := &layer.GetObjectParams{
|
||||
Bucket: inf.Bucket,
|
||||
Object: inf.Name,
|
||||
Writer: w,
|
||||
}
|
||||
|
||||
// params.Length = inf.Size
|
||||
|
||||
if err = h.obj.GetObject(r.Context(), params); err != nil {
|
||||
h.log.Error("could not get object",
|
||||
|
@ -62,8 +62,8 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
w.Header().Set("Content-Type", inf.ContentType)
|
||||
w.Header().Set("Last-Modified", inf.Created.Format(http.TimeFormat))
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(inf.Size, 10))
|
||||
|
||||
w.Header().Set("Last-Modified", inf.Created.Format(http.TimeFormat))
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
|
|
@ -39,13 +39,11 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
w.Header().Set("Content-Type", inf.ContentType)
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(inf.Size, 10))
|
||||
|
||||
w.Header().Set("Last-Modified", inf.Created.Format(http.TimeFormat))
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *handler) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
@ -36,5 +36,7 @@ func (h *handler) GetBucketLocationHandler(w http.ResponseWriter, r *http.Reques
|
|||
Description: err.Error(),
|
||||
HTTPStatusCode: http.StatusInternalServerError,
|
||||
}, r.URL)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,19 +3,19 @@ package layer
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"io"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api/pool"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -122,6 +122,11 @@ func (n *layer) Get(ctx context.Context, address *object.Address) (*object.Objec
|
|||
|
||||
// GetBucketInfo returns bucket name.
|
||||
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) {
|
||||
name, err := url.QueryUnescape(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
list, err := n.containerList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -213,9 +218,11 @@ func (n *layer) ListObjects(ctx context.Context, p *ListObjectsParams) (*ListObj
|
|||
)
|
||||
|
||||
if ind < 0 { // if there are not sub-entities in tail - file
|
||||
oi = objectInfoFromMeta(meta)
|
||||
oi = objectInfoFromMeta(bkt, meta)
|
||||
} else { // if there are sub-entities in tail - dir
|
||||
oi = &ObjectInfo{
|
||||
id: meta.GetID(),
|
||||
|
||||
Owner: meta.GetOwnerID(),
|
||||
Bucket: bkt.Name,
|
||||
Name: tail[:ind+1], // dir MUST have slash symbol in the end
|
||||
|
@ -244,7 +251,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
|||
)
|
||||
|
||||
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "bucket = %s", p.Bucket)
|
||||
} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -268,28 +275,27 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
|||
// GetObjectInfo returns meta information about the object.
|
||||
func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string) (*ObjectInfo, error) {
|
||||
var (
|
||||
err error
|
||||
oid *object.ID
|
||||
cid = container.NewID()
|
||||
|
||||
err error
|
||||
oid *object.ID
|
||||
bkt *BucketInfo
|
||||
meta *object.Object
|
||||
)
|
||||
|
||||
if err = cid.Parse(bucketName); err != nil {
|
||||
if bkt, err = n.GetBucketInfo(ctx, bucketName); err != nil {
|
||||
return nil, err
|
||||
} else if oid, err = n.objectFindID(ctx, &findParams{cid: cid, val: filename}); err != nil {
|
||||
} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(oid)
|
||||
addr.SetContainerID(cid)
|
||||
addr.SetContainerID(bkt.CID)
|
||||
|
||||
if meta, err = n.objectHead(ctx, addr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return objectInfoFromMeta(meta), nil
|
||||
return objectInfoFromMeta(bkt, meta), nil
|
||||
}
|
||||
|
||||
func GetOwnerID(tkn *token.BearerToken) (*owner.ID, error) {
|
||||
|
@ -312,7 +318,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
|
|||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInfo, error) {
|
||||
info, err := n.GetObjectInfo(ctx, p.SrcBucket, p.SrcObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "get-object-info")
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
@ -324,7 +330,9 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf
|
|||
Writer: pw,
|
||||
})
|
||||
|
||||
_ = pw.CloseWithError(err)
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
n.log.Error("could not get object", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
// set custom headers
|
||||
|
@ -346,15 +354,15 @@ func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error
|
|||
var (
|
||||
err error
|
||||
ids []*object.ID
|
||||
cid = container.NewID()
|
||||
bkt *BucketInfo
|
||||
)
|
||||
|
||||
if err = cid.Parse(bucket); err != nil {
|
||||
if bkt, err = n.GetBucketInfo(ctx, bucket); err != nil {
|
||||
return &api.DeleteError{
|
||||
Err: err,
|
||||
Object: filename,
|
||||
}
|
||||
} else if ids, err = n.objectSearch(ctx, &findParams{cid: cid, val: filename}); err != nil {
|
||||
} else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil {
|
||||
return &api.DeleteError{
|
||||
Err: err,
|
||||
Object: filename,
|
||||
|
@ -364,7 +372,7 @@ func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error
|
|||
for _, id := range ids {
|
||||
addr := object.NewAddress()
|
||||
addr.SetObjectID(id)
|
||||
addr.SetContainerID(cid)
|
||||
addr.SetContainerID(bkt.CID)
|
||||
|
||||
if err = n.objectDelete(ctx, addr); err != nil {
|
||||
return &api.DeleteError{
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -14,7 +16,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/auth"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -57,6 +58,11 @@ func (n *layer) prepareClient(ctx context.Context) (*client.Client, *token.Sessi
|
|||
|
||||
// objectSearch returns all available objects by search params.
|
||||
func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, error) {
|
||||
filename, err := url.QueryUnescape(p.val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cli, tkn, err := n.prepareClient(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -69,7 +75,7 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID,
|
|||
sop.WithContainerID(p.cid)
|
||||
|
||||
if p.val != "" {
|
||||
filter.AddFilter(object.AttributeFileName, p.val, object.MatchStringEqual)
|
||||
filter.AddFilter(object.AttributeFileName, filename, object.MatchStringEqual)
|
||||
}
|
||||
|
||||
sop.WithSearchFilters(filter)
|
||||
|
@ -113,7 +119,9 @@ func (n *layer) objectGet(ctx context.Context, p *getParams) (*object.Object, er
|
|||
}
|
||||
|
||||
// prepare length/offset writer
|
||||
writer := newWriter(p.Writer, p.offset, p.length)
|
||||
b := bufio.NewWriter(p.Writer)
|
||||
w := newWriter(b, p.offset, p.length)
|
||||
writer := newWriter(w, p.offset, p.length)
|
||||
|
||||
gop := new(client.GetObjectParams)
|
||||
gop.WithAddress(p.addr)
|
||||
|
@ -126,20 +134,26 @@ func (n *layer) objectGet(ctx context.Context, p *getParams) (*object.Object, er
|
|||
func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) {
|
||||
var (
|
||||
err error
|
||||
obj string
|
||||
own *owner.ID
|
||||
oid *object.ID
|
||||
bkt *BucketInfo
|
||||
brt *token.BearerToken
|
||||
// brt *token.BearerToken
|
||||
)
|
||||
|
||||
if brt, err = auth.GetBearerToken(ctx); err != nil {
|
||||
return nil, err
|
||||
} else if own, err = GetOwnerID(brt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// if brt, err = auth.GetBearerToken(ctx); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// else if own, err = GetOwnerID(brt); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
_ = own
|
||||
|
||||
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||
if obj, err = url.QueryUnescape(p.Object); err != nil {
|
||||
return nil, err
|
||||
} else if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||
return nil, err
|
||||
} else if _, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err == nil {
|
||||
return nil, &api.ObjectAlreadyExists{
|
||||
|
@ -155,11 +169,11 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
|
|||
|
||||
attributes := make([]*object.Attribute, 0, len(p.Header)+1)
|
||||
|
||||
unix := strconv.FormatInt(time.Now().UTC().Unix(), 64)
|
||||
unix := strconv.FormatInt(time.Now().UTC().Unix(), 10)
|
||||
|
||||
filename := object.NewAttribute()
|
||||
filename.SetKey(object.AttributeFileName)
|
||||
filename.SetValue(p.Object)
|
||||
filename.SetValue(obj)
|
||||
|
||||
createdAt := object.NewAttribute()
|
||||
createdAt.SetKey(object.AttributeTimestamp)
|
||||
|
@ -187,11 +201,13 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo,
|
|||
pop.WithPayloadReader(r)
|
||||
pop.WithObject(raw.Object())
|
||||
|
||||
if _, err = cli.PutObject(ctx, pop, client.WithSession(tkn)); err != nil {
|
||||
if oid, err = cli.PutObject(ctx, pop, client.WithSession(tkn)); err != nil {
|
||||
return nil, errors.Wrapf(err, "owner_id = %s", tkn.OwnerID())
|
||||
}
|
||||
|
||||
return &ObjectInfo{
|
||||
id: oid,
|
||||
|
||||
Bucket: p.Bucket,
|
||||
Name: p.Object,
|
||||
Size: p.Size,
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -13,6 +13,8 @@ import (
|
|||
|
||||
type (
|
||||
ObjectInfo struct {
|
||||
id *object.ID
|
||||
|
||||
Bucket string
|
||||
Name string
|
||||
Size int64
|
||||
|
@ -59,35 +61,43 @@ func userHeaders(attrs []*object.Attribute) map[string]string {
|
|||
return result
|
||||
}
|
||||
|
||||
func objectInfoFromMeta(meta *object.Object) *ObjectInfo {
|
||||
aws3name := meta.GetID().String()
|
||||
func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object) *ObjectInfo {
|
||||
var (
|
||||
creation time.Time
|
||||
filename = meta.GetID().String()
|
||||
)
|
||||
|
||||
userHeaders := userHeaders(meta.GetAttributes())
|
||||
if name, ok := userHeaders[object.AttributeFileName]; ok {
|
||||
aws3name = name
|
||||
delete(userHeaders, name)
|
||||
if val, ok := userHeaders[object.AttributeFileName]; ok {
|
||||
filename = val
|
||||
delete(userHeaders, object.AttributeFileName)
|
||||
}
|
||||
|
||||
if val, ok := userHeaders[object.AttributeTimestamp]; !ok {
|
||||
// ignore empty value
|
||||
} else if dt, err := strconv.ParseInt(val, 10, 64); err == nil {
|
||||
creation = time.Unix(dt, 0)
|
||||
delete(userHeaders, object.AttributeTimestamp)
|
||||
}
|
||||
|
||||
mimeType := http.DetectContentType(meta.GetPayload())
|
||||
|
||||
return &ObjectInfo{
|
||||
Bucket: meta.GetContainerID().String(),
|
||||
Name: aws3name,
|
||||
id: meta.GetID(),
|
||||
|
||||
Bucket: bkt.Name,
|
||||
Name: filename,
|
||||
Created: creation,
|
||||
ContentType: mimeType,
|
||||
Headers: userHeaders,
|
||||
Size: int64(meta.GetPayloadSize()),
|
||||
Created: time.Now(), // time.Unix(meta.GetCreationEpoch(), 0),
|
||||
}
|
||||
}
|
||||
|
||||
func nameFromObject(o *object.Object) (string, string) {
|
||||
var name = o.GetID().String()
|
||||
|
||||
fmt.Printf("OID: %s\n", name)
|
||||
fmt.Println("Attributes:")
|
||||
for _, attr := range o.GetAttributes() {
|
||||
fmt.Printf("\t%s = %s\n", attr.GetKey(), attr.GetValue())
|
||||
|
||||
if attr.GetKey() == object.AttributeFileName {
|
||||
name = attr.GetValue()
|
||||
|
||||
|
@ -99,3 +109,5 @@ func nameFromObject(o *object.Object) (string, string) {
|
|||
|
||||
return name[ind+1:], name[:ind+1]
|
||||
}
|
||||
|
||||
func (o *ObjectInfo) ID() *object.ID { return o.id }
|
||||
|
|
|
@ -122,6 +122,8 @@ func WriteErrorResponse(ctx context.Context, w http.ResponseWriter, err error, r
|
|||
code := http.StatusBadRequest
|
||||
|
||||
if e, ok := err.(Error); ok {
|
||||
code = e.HTTPStatusCode
|
||||
|
||||
switch e.Code {
|
||||
case "SlowDown", "XNeoFSServerNotInitialized", "XNeoFSReadQuorum", "XNeoFSWriteQuorum":
|
||||
// Set retry-after header to indicate user-agents to retry request after 120secs.
|
||||
|
@ -191,17 +193,22 @@ func EncodeResponse(response interface{}) []byte {
|
|||
|
||||
// EncodeToResponse encodes the response into ResponseWriter.
|
||||
func EncodeToResponse(w http.ResponseWriter, response interface{}) error {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
if _, err := w.Write(xmlHeader); err != nil {
|
||||
return err
|
||||
} else if err = xml.NewEncoder(w).Encode(response); err != nil {
|
||||
return err
|
||||
}
|
||||
return xml.NewEncoder(w).Encode(response)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteSuccessResponseXML writes success headers and response if any,
|
||||
// with content-type set to `application/xml`.
|
||||
func WriteSuccessResponseXML(w http.ResponseWriter, response []byte) {
|
||||
WriteResponse(w, http.StatusOK, response, MimeXML)
|
||||
}
|
||||
// // WriteSuccessResponseXML writes success headers and response if any,
|
||||
// // with content-type set to `application/xml`.
|
||||
// func WriteSuccessResponseXML(w http.ResponseWriter, response []byte) {
|
||||
// WriteResponse(w, http.StatusOK, response, MimeXML)
|
||||
// }
|
||||
|
||||
func WriteSuccessResponseHeadersOnly(w http.ResponseWriter) {
|
||||
WriteResponse(w, http.StatusOK, nil, MimeNone)
|
||||
|
|
|
@ -137,7 +137,13 @@ func logErrorResponse(l *zap.Logger) mux.MiddlewareFunc {
|
|||
if lw.statusCode >= http.StatusMultipleChoices {
|
||||
l.Error("something went wrong",
|
||||
zap.Int("status", lw.statusCode),
|
||||
zap.String("method", mux.CurrentRoute(r).GetName()))
|
||||
zap.String("method", mux.CurrentRoute(r).GetName()),
|
||||
zap.String("description", http.StatusText(lw.statusCode)))
|
||||
} else {
|
||||
l.Info("call method",
|
||||
zap.Int("status", lw.statusCode),
|
||||
zap.String("method", mux.CurrentRoute(r).GetName()),
|
||||
zap.String("description", http.StatusText(lw.statusCode)))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -162,7 +168,7 @@ func Attach(r *mux.Router, m MaxClients, h Handler, center *auth.Center, log *za
|
|||
setRequestID,
|
||||
|
||||
// -- logging error requests
|
||||
// logErrorResponse(log),
|
||||
logErrorResponse(log),
|
||||
)
|
||||
|
||||
// Attach user authentication for all S3 routes.
|
||||
|
|
Loading…
Reference in a new issue