frostfs-s3-gw/api/layer/object.go
Denis Kirillov 9b1ccd39be [#475] Add extra attributes to partInfo
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
2022-07-22 15:19:16 +03:00

656 lines
17 KiB
Go

package layer
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"mime"
"path/filepath"
"sort"
"strings"
"time"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/internal/misc"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
type (
getParams struct {
// payload range
off, ln uint64
oid oid.ID
bktInfo *data.BucketInfo
}
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
allObjectParams struct {
Bucket *data.BucketInfo
Delimiter string
Prefix string
}
)
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(obj)
return addr
}
// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
prm := PrmObjectRead{
Container: bktInfo.CID,
Object: idObj,
WithHeader: true,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
res, err := n.neoFS.ReadObject(ctx, prm)
if err != nil {
return nil, n.transformNeofsError(ctx, err)
}
return res.Head, nil
}
// initializes payload reader of the NeoFS object.
// Zero range corresponds to full payload (panics if only offset is set).
func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
prm := PrmObjectRead{
Container: p.bktInfo.CID,
Object: p.oid,
WithPayload: true,
PayloadRange: [2]uint64{p.off, p.ln},
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, p.bktInfo.Owner)
res, err := n.neoFS.ReadObject(ctx, prm)
if err != nil {
return nil, n.transformNeofsError(ctx, err)
}
return res.Payload, nil
}
// objectGet returns an object with payload in the object.
func (n *layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*object.Object, error) {
prm := PrmObjectRead{
Container: bktInfo.CID,
Object: objID,
WithHeader: true,
WithPayload: true,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
res, err := n.neoFS.ReadObject(ctx, prm)
if err != nil {
return nil, n.transformNeofsError(ctx, err)
}
return res.Head, nil
}
// MimeByFileName detect mime type by filename extension.
func MimeByFileName(name string) string {
ext := filepath.Ext(name)
if len(ext) == 0 {
return ""
}
return mime.TypeByExtension(ext)
}
// PutObject stores object into NeoFS, took payload from io.Reader.
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error) {
own := n.Owner(ctx)
versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo)
newVersion := &data.NodeVersion{IsUnversioned: !versioningEnabled}
r := p.Reader
if r != nil {
if len(p.Header[api.ContentType]) == 0 {
if contentType := MimeByFileName(p.Object); len(contentType) == 0 {
d := newDetector(r)
if contentType, err := d.Detect(); err == nil {
p.Header[api.ContentType] = contentType
}
r = d.MultiReader()
} else {
p.Header[api.ContentType] = contentType
}
}
}
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Creator: own,
PayloadSize: uint64(p.Size),
Filename: p.Object,
Payload: r,
}
prm.Attributes = make([][2]string, 0, len(p.Header))
for k, v := range p.Header {
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, err
}
newVersion.OID = *id
if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, p.Object, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}
currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute))
if err != nil {
n.log.Warn("couldn't get creation epoch",
zap.String("bucket", p.BktInfo.Name),
zap.String("object", misc.SanitizeString(p.Object)),
zap.Error(err))
}
if p.Lock != nil && (p.Lock.Retention != nil || p.Lock.LegalHold != nil) {
objVersion := &ObjectVersion{
BktInfo: p.BktInfo,
ObjectName: p.Object,
VersionID: id.EncodeToString(),
}
if err = n.PutLockInfo(ctx, objVersion, p.Lock); err != nil {
return nil, err
}
}
n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
objInfo := &data.ObjectInfo{
ID: *id,
CID: p.BktInfo.CID,
Owner: own,
Bucket: p.BktInfo.Name,
Name: p.Object,
Size: p.Size,
Created: time.Now(),
CreationEpoch: currentEpoch,
Headers: p.Header,
ContentType: p.Header[api.ContentType],
HashSum: hex.EncodeToString(hash),
}
if err = n.objCache.PutObject(objInfo); err != nil {
n.log.Warn("couldn't add object to cache", zap.Error(err),
zap.String("object_name", p.Object), zap.String("bucket_name", p.BktInfo.Name),
zap.String("cid", objInfo.CID.EncodeToString()), zap.String("oid", objInfo.ID.EncodeToString()))
}
if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil {
n.log.Warn("couldn't put obj address to name cache",
zap.String("obj nice name", objInfo.NiceName()),
zap.Error(err))
}
return objInfo, nil
}
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ObjectInfo, error) {
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
if headInfo := n.objCache.Get(*addr); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
}
node, err := n.treeService.GetLatestVersion(ctx, &bkt.CID, objectName)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
return nil, err
}
if node.DeleteMarker != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
meta, err := n.objectHead(ctx, bkt, node.OID)
if err != nil {
return nil, err
}
if err = n.objCache.Put(*meta); err != nil {
n.log.Warn("couldn't put meta to objects cache",
zap.Stringer("object id", node.OID),
zap.Stringer("bucket id", bkt.CID),
zap.Error(err))
}
objInfo := objInfoFromMeta(bkt, meta)
if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil {
n.log.Warn("couldn't put obj address to head cache",
zap.String("obj nice name", objInfo.NiceName()),
zap.Error(err))
}
return objInfo, nil
}
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ObjectInfo, error) {
var err error
var foundVersion *data.NodeVersion
if p.VersionID == unversionedObjectVersionID {
foundVersion, err = n.treeService.GetUnversioned(ctx, &bkt.CID, p.Object)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
return nil, err
}
} else {
versions, err := n.treeService.GetVersions(ctx, &bkt.CID, p.Object)
if err != nil {
return nil, fmt.Errorf("couldn't get versions: %w", err)
}
for _, version := range versions {
if version.OID.EncodeToString() == p.VersionID {
foundVersion = version
break
}
}
if foundVersion == nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
}
if headInfo := n.objCache.Get(newAddress(bkt.CID, foundVersion.OID)); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
meta, err := n.objectHead(ctx, bkt, foundVersion.OID)
if err != nil {
if client.IsErrObjectNotFound(err) {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
return nil, err
}
objInfo := objInfoFromMeta(bkt, meta)
if err = n.objCache.Put(*meta); err != nil {
n.log.Warn("couldn't put obj to object cache",
zap.String("bucket name", objInfo.Bucket),
zap.Stringer("bucket cid", objInfo.CID),
zap.String("object name", objInfo.Name),
zap.Stringer("object id", objInfo.ID),
zap.Error(err))
}
return objInfo, nil
}
// objectDelete puts tombstone object into neofs.
func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) error {
prm := PrmObjectDelete{
Container: bktInfo.CID,
Object: idObj,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
n.objCache.Delete(newAddress(bktInfo.CID, idObj))
return n.transformNeofsError(ctx, n.neoFS.DeleteObject(ctx, prm))
}
// objectPutAndHash prepare auth parameters and invoke neofs.CreateObject.
// Returns object ID and payload sha256 hash.
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (*oid.ID, []byte, error) {
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
hash := sha256.New()
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
hash.Write(buf)
})
id, err := n.neoFS.CreateObject(ctx, prm)
return id, hash.Sum(nil), n.transformNeofsError(ctx, err)
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var (
err error
result ListObjectsInfoV1
allObjects []*data.ObjectInfo
)
if p.MaxKeys == 0 {
return &result, nil
}
if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
return nil, err
}
if len(allObjects) == 0 {
return &result, nil
}
if p.Marker != "" {
allObjects = trimAfterObjectName(p.Marker, allObjects)
}
if len(allObjects) > p.MaxKeys {
result.IsTruncated = true
allObjects = allObjects[:p.MaxKeys]
result.NextMarker = allObjects[len(allObjects)-1].Name
}
result.Prefixes, result.Objects = triageObjects(allObjects)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var (
err error
result ListObjectsInfoV2
allObjects []*data.ObjectInfo
)
if p.MaxKeys == 0 {
return &result, nil
}
if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
return nil, err
}
if len(allObjects) == 0 {
return &result, nil
}
if p.ContinuationToken != "" {
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
}
if p.StartAfter != "" {
allObjects = trimAfterObjectName(p.StartAfter, allObjects)
}
if len(allObjects) > p.MaxKeys {
result.IsTruncated = true
allObjects = allObjects[:p.MaxKeys]
result.NextContinuationToken = allObjects[len(allObjects)-1].ID.EncodeToString()
}
result.Prefixes, result.Objects = triageObjects(allObjects)
return &result, nil
}
func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, error) {
objects, err := n.getLatestObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
return objects, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) ([]*data.ObjectInfo, error) {
var err error
cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, true)
ids := n.listsCache.Get(cacheKey)
if ids == nil {
ids, err = n.treeService.GetLatestVersionsByPrefix(ctx, &bkt.CID, prefix)
if err != nil {
return nil, err
}
if err := n.listsCache.Put(cacheKey, ids); err != nil {
n.log.Error("couldn't cache list of objects", zap.Error(err))
}
}
objectsMap := make(map[string]*data.ObjectInfo, len(ids)) // to squash the same directories
for i := 0; i < len(ids); i++ {
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, ids[i])
if obj == nil {
continue
}
if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil {
objectsMap[oi.Name] = oi
}
}
objects := make([]*data.ObjectInfo, 0, len(objectsMap))
for _, obj := range objectsMap {
objects = append(objects, obj)
}
return objects, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
var err error
cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, false)
nodeVersions := n.listsCache.GetVersions(cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, &bkt.CID, prefix)
if err != nil {
return nil, err
}
if err = n.listsCache.PutVersions(cacheKey, nodeVersions); err != nil {
n.log.Error("couldn't cache list of objects", zap.Error(err))
}
}
versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions))
for _, nodeVersion := range nodeVersions {
oi := &data.ObjectInfo{}
if nodeVersion.DeleteMarker != nil { // delete marker does not match any object in NeoFS
oi.ID = nodeVersion.OID
oi.Name = nodeVersion.DeleteMarker.FilePath
oi.Owner = nodeVersion.DeleteMarker.Owner
oi.Created = nodeVersion.DeleteMarker.Created
oi.IsDeleteMarker = true
} else {
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion.OID)
if obj == nil {
continue
}
oi = objectInfoFromMeta(bkt, obj, prefix, delimiter)
if oi == nil {
continue
}
}
eoi := &data.ExtendedObjectInfo{
ObjectInfo: oi,
NodeVersion: nodeVersion,
}
objVersions, ok := versions[oi.Name]
if !ok {
objVersions = []*data.ExtendedObjectInfo{eoi}
} else if !oi.IsDir {
objVersions = append(objVersions, eoi)
}
versions[oi.Name] = objVersions
}
return versions, nil
}
func IsSystemHeader(key string) bool {
return strings.HasPrefix(key, "S3-")
}
func trimAfterObjectName(startAfter string, objects []*data.ObjectInfo) []*data.ObjectInfo {
if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter {
return nil
}
for i := range objects {
if objects[i].Name > startAfter {
return objects[i:]
}
}
return nil
}
func trimAfterObjectID(id string, objects []*data.ObjectInfo) []*data.ObjectInfo {
if len(objects) != 0 && objects[len(objects)-1].ID.EncodeToString() == id {
return []*data.ObjectInfo{}
}
for i, obj := range objects {
if obj.ID.EncodeToString() == id {
return objects[i+1:]
}
}
return nil
}
func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) {
for _, ov := range allObjects {
if ov.IsDir {
prefixes = append(prefixes, ov.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*data.ObjectInfo, error) {
var (
err error
allObjects []*data.ObjectInfo
)
allObjects, err = n.listSortedObjects(ctx, allObjectParams{
Bucket: p.BktInfo,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
})
if err != nil {
return nil, err
}
return allObjects, nil
}
func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *data.BucketInfo) bool {
settings, err := n.GetBucketSettings(ctx, bktInfo)
if err != nil {
n.log.Warn("couldn't get versioning settings object", zap.Error(err))
return false
}
return settings.VersioningEnabled
}
func (n *layer) objectFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID) *object.Object {
var (
err error
meta = n.objCache.Get(newAddress(bktInfo.CID, obj))
)
if meta == nil {
meta, err = n.objectHead(ctx, bktInfo, obj)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
return nil
}
if err = n.objCache.Put(*meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
}
}
return meta
}
func (n *layer) transformNeofsError(ctx context.Context, err error) error {
if err == nil {
return nil
}
if errors.Is(err, ErrAccessDenied) {
n.log.Debug("error was transformed", zap.String("request_id", api.GetRequestID(ctx)), zap.Error(err))
return apiErrors.GetAPIError(apiErrors.ErrAccessDenied)
}
return err
}
func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader {
if input == nil {
return nil
}
r, w := io.Pipe()
go func() {
var buf = make([]byte, bufSize)
for {
n, err := input.Read(buf)
if n > 0 {
f(buf[:n])
_, _ = w.Write(buf[:n]) // ignore error, input is not ReadCloser
}
if err != nil {
_ = w.CloseWithError(err)
break
}
}
}()
return r
}