Merge pull request #134 from masterSplinter01/feature/102-add-versioning-for-benchmark

null-versioning and corresponding handler
This commit is contained in:
Alex Vanin 2021-07-06 15:02:15 +03:00 committed by GitHub
commit b3609fb337
6 changed files with 260 additions and 11 deletions

View file

@ -39,6 +39,19 @@ type ListMultipartUploadsResult struct {
var maxObjectList = 10000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse. var maxObjectList = 10000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
func (h *handler) registerAndSendError(w http.ResponseWriter, r *http.Request, err error, logText string) {
rid := api.GetRequestID(r.Context())
h.log.Error(logText,
zap.String("request_id", rid),
zap.Error(err))
api.WriteErrorResponse(r.Context(), w, api.Error{
Code: api.GetAPIError(api.ErrBadRequest).Code,
Description: err.Error(),
HTTPStatusCode: http.StatusBadRequest,
}, r.URL)
}
// ListBucketsHandler handles bucket listing requests. // ListBucketsHandler handles bucket listing requests.
func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
var ( var (
@ -345,3 +358,92 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
}, r.URL) }, r.URL)
} }
} }
func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
p, err := parseListObjectVersionsRequest(r)
if err != nil {
h.registerAndSendError(w, r, err, "failed to parse request ")
return
}
info, err := h.obj.ListObjectVersions(r.Context(), p)
if err != nil {
h.registerAndSendError(w, r, err, "something went wrong")
return
}
response := encodeListObjectVersionsToResponse(info, p.Bucket)
if err := api.EncodeToResponse(w, response); err != nil {
h.registerAndSendError(w, r, err, "something went wrong")
}
}
func parseListObjectVersionsRequest(r *http.Request) (*layer.ListObjectVersionsParams, error) {
var (
err error
res layer.ListObjectVersionsParams
)
if r.URL.Query().Get("max-keys") == "" {
res.MaxKeys = maxObjectList
} else if res.MaxKeys, err = strconv.Atoi(r.URL.Query().Get("max-keys")); err != nil || res.MaxKeys <= 0 {
return nil, api.GetAPIError(api.ErrInvalidMaxKeys)
}
res.Prefix = r.URL.Query().Get("prefix")
res.KeyMarker = r.URL.Query().Get("marker")
res.Delimiter = r.URL.Query().Get("delimiter")
res.Encode = r.URL.Query().Get("encoding-type")
res.VersionIDMarker = r.URL.Query().Get("version-id-marker")
if info := api.GetReqInfo(r.Context()); info != nil {
res.Bucket = info.BucketName
}
return &res, nil
}
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string) *ListObjectsVersionsResponse {
res := ListObjectsVersionsResponse{
Name: bucketName,
IsTruncated: info.IsTruncated,
KeyMarker: info.KeyMarker,
NextKeyMarker: info.NextKeyMarker,
NextVersionIDMarker: info.NextVersionIDMarker,
VersionIDMarker: info.VersionIDMarker,
}
for _, prefix := range info.CommonPrefixes {
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: *prefix})
}
for _, ver := range info.Version {
res.Version = append(res.Version, ObjectVersionResponse{
IsLatest: ver.IsLatest,
Key: ver.Object.Name,
LastModified: ver.Object.Created.Format(time.RFC3339),
Owner: Owner{
ID: ver.Object.Owner.String(),
DisplayName: ver.Object.Owner.String(),
},
Size: ver.Object.Size,
VersionID: ver.VersionID,
ETag: ver.Object.HashSum,
})
}
// this loop is not starting till versioning is not implemented
for _, del := range info.DeleteMarker {
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
IsLatest: del.IsLatest,
Key: del.Key,
LastModified: del.LastModified,
Owner: Owner{
ID: del.Owner.String(),
DisplayName: del.Owner.String(),
},
VersionID: del.VersionID,
})
}
return &res
}

View file

@ -109,6 +109,27 @@ type Object struct {
UserMetadata StringMap `xml:"UserMetadata,omitempty"` UserMetadata StringMap `xml:"UserMetadata,omitempty"`
} }
// ObjectVersionResponse container for object version in the response of ListBucketObjectVersionsHandler.
type ObjectVersionResponse struct {
ETag string `xml:"ETag"`
IsLatest bool `xml:"IsLatest"`
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
Owner Owner `xml:"Owner"`
Size int64 `xml:"Size"`
StorageClass string `xml:"StorageClass,omitempty"` // is empty!!
VersionID string `xml:"VersionId"`
}
// DeleteMarkerEntry container for deleted object's version in the response of ListBucketObjectVersionsHandler.
type DeleteMarkerEntry struct {
IsLatest bool `xml:"IsLatest"`
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
Owner Owner `xml:"Owner"`
VersionID string `xml:"VersionId"`
}
// StringMap is a map[string]string. // StringMap is a map[string]string.
type StringMap map[string]string type StringMap map[string]string
@ -125,6 +146,21 @@ type CopyObjectResponse struct {
ETag string // md5sum of the copied object. ETag string // md5sum of the copied object.
} }
// ListObjectsVersionsResponse is a response of ListBucketObjectVersionsHandler.
type ListObjectsVersionsResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult" json:"-"`
EncodingType string `xml:"EncodingType,omitempty"`
Name string `xml:"Name"`
IsTruncated bool `xml:"IsTruncated"`
KeyMarker string `xml:"KeyMarker"`
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
NextVersionIDMarker string `xml:"NextVersionIdMarker,omitempty"`
VersionIDMarker string `xml:"VersionIdMarker"`
DeleteMarker []DeleteMarkerEntry `xml:"DeleteMarker"`
Version []ObjectVersionResponse `xml:"Version"`
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
}
// MarshalXML - StringMap marshals into XML. // MarshalXML - StringMap marshals into XML.
func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error { func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
tokens := []xml.Token{start} tokens := []xml.Token{start}

View file

@ -279,14 +279,6 @@ func (h *handler) ListObjectsV2MHandler(w http.ResponseWriter, r *http.Request)
}, r.URL) }, r.URL)
} }
func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
api.WriteErrorResponse(r.Context(), w, api.Error{
Code: "XNeoFSUnimplemented",
Description: "implement me " + mux.CurrentRoute(r).GetName(),
HTTPStatusCode: http.StatusNotImplemented,
}, r.URL)
}
func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) { func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
api.WriteErrorResponse(r.Context(), w, api.Error{ api.WriteErrorResponse(r.Context(), w, api.Error{
Code: "XNeoFSUnimplemented", Code: "XNeoFSUnimplemented",

View file

@ -82,6 +82,16 @@ type (
DeleteBucketParams struct { DeleteBucketParams struct {
Name string Name string
} }
// ListObjectVersionsParams stores list objects versions parameters.
ListObjectVersionsParams struct {
Bucket string
Delimiter string
KeyMarker string
MaxKeys int
Prefix string
VersionIDMarker string
Encode string
}
// NeoFS provides basic NeoFS interface. // NeoFS provides basic NeoFS interface.
NeoFS interface { NeoFS interface {
@ -105,6 +115,7 @@ type (
CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInfo, error) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInfo, error)
ListObjects(ctx context.Context, p *ListObjectsParams) (*ListObjectsInfo, error) ListObjects(ctx context.Context, p *ListObjectsParams) (*ListObjectsInfo, error)
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
DeleteObject(ctx context.Context, bucket, object string) error DeleteObject(ctx context.Context, bucket, object string) error
DeleteObjects(ctx context.Context, bucket string, objects []string) []error DeleteObjects(ctx context.Context, bucket string, objects []string) []error
@ -118,8 +129,11 @@ var (
ErrObjectNotExists = errors.New("object not exists") ErrObjectNotExists = errors.New("object not exists")
) )
// ETag (hex encoded md5sum) of empty string. const (
const emptyETag = "d41d8cd98f00b204e9800998ecf8427e" // ETag (hex encoded md5sum) of empty string.
emptyETag = "d41d8cd98f00b204e9800998ecf8427e"
unversionedObjectVersionID = "null"
)
// NewLayer creates instance of layer. It checks credentials // NewLayer creates instance of layer. It checks credentials
// and establishes gRPC connection with node. // and establishes gRPC connection with node.
@ -488,3 +502,73 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
return n.deleteContainer(ctx, bucketInfo.CID) return n.deleteContainer(ctx, bucketInfo.CID)
} }
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var (
res = ListObjectVersionsInfo{}
err error
bkt *BucketInfo
ids []*object.ID
uniqNames = make(map[string]bool)
)
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return nil, err
} else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID}); err != nil {
return nil, err
}
versions := make([]*ObjectVersionInfo, 0, len(ids))
// todo: deletemarkers is empty now, we will use it after proper realization of versioning
deleted := make([]*DeletedObjectInfo, 0, len(ids))
res.DeleteMarker = deleted
for _, id := range ids {
addr := object.NewAddress()
addr.SetObjectID(id)
addr.SetContainerID(bkt.CID)
meta, err := n.objectHead(ctx, addr)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if ov := objectVersionInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); ov != nil {
if _, ok := uniqNames[ov.Object.Name]; ok {
continue
}
if len(p.KeyMarker) > 0 && ov.Object.Name <= p.KeyMarker {
continue
}
uniqNames[ov.Object.Name] = ov.Object.isDir
versions = append(versions, ov)
}
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].Object.Name < versions[j].Object.Name
})
if len(versions) > p.MaxKeys {
res.IsTruncated = true
lastVersion := versions[p.MaxKeys-1]
res.KeyMarker = lastVersion.Object.Name
res.VersionIDMarker = lastVersion.VersionID
nextVersion := versions[p.MaxKeys]
res.NextKeyMarker = nextVersion.Object.Name
res.NextVersionIDMarker = nextVersion.VersionID
versions = versions[:p.MaxKeys]
}
for _, ov := range versions {
if isDir := uniqNames[ov.Object.Name]; isDir {
res.CommonPrefixes = append(res.CommonPrefixes, &ov.Object.Name)
} else {
res.Version = append(res.Version, ov)
}
}
return &res, nil
}

View file

@ -53,6 +53,34 @@ type (
// List of prefixes for this request. // List of prefixes for this request.
Prefixes []string Prefixes []string
} }
// ObjectVersionInfo stores info about objects versions.
ObjectVersionInfo struct {
Object *ObjectInfo
IsLatest bool
VersionID string
}
// DeletedObjectInfo stores info about deleted versions of objects.
DeletedObjectInfo struct {
Owner *owner.ID
Key string
VersionID string
IsLatest bool
LastModified string
}
// ListObjectVersionsInfo stores info and list of objects' versions.
ListObjectVersionsInfo struct {
CommonPrefixes []*string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*ObjectVersionInfo
DeleteMarker []*DeletedObjectInfo
VersionIDMarker string
}
) )
// PathSeparator is a path components separator string. // PathSeparator is a path components separator string.
@ -123,6 +151,14 @@ func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter
} }
} }
func objectVersionInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectVersionInfo {
oi := objectInfoFromMeta(bkt, meta, prefix, delimiter)
if oi == nil {
return nil
}
return &ObjectVersionInfo{Object: oi, IsLatest: true, VersionID: unversionedObjectVersionID}
}
func filenameFromObject(o *object.Object) string { func filenameFromObject(o *object.Object) string {
var name = o.ID().String() var name = o.ID().String()
for _, attr := range o.Attributes() { for _, attr := range o.Attributes() {

1
go.sum
View file

@ -349,7 +349,6 @@ github.com/nspcc-dev/neo-go v0.95.3/go.mod h1:t15xRFDVhz5o/pstptdoW9N9JJBNn1hZ6A
github.com/nspcc-dev/neofs-api-go v1.22.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8= github.com/nspcc-dev/neofs-api-go v1.22.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8= github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
github.com/nspcc-dev/neofs-api-go v1.27.0/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE= github.com/nspcc-dev/neofs-api-go v1.27.0/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
github.com/nspcc-dev/neofs-api-go v1.27.1 h1:ONdKOnm0/hK6m38VTUliCHY6RTxg+IpAzY4G+BeOZG4=
github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE= github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
github.com/nspcc-dev/neofs-api-go v1.28.0 h1:q3FNNz0as4XDqm/AJlAIWMX8YH0nLNbi+G9bONhnKnU= github.com/nspcc-dev/neofs-api-go v1.28.0 h1:q3FNNz0as4XDqm/AJlAIWMX8YH0nLNbi+G9bONhnKnU=
github.com/nspcc-dev/neofs-api-go v1.28.0/go.mod h1:YRIzUqBj/lGbmFm8mmCh54ZOzcJKkEIhv2s7ZvSLv3M= github.com/nspcc-dev/neofs-api-go v1.28.0/go.mod h1:YRIzUqBj/lGbmFm8mmCh54ZOzcJKkEIhv2s7ZvSLv3M=