[#217] Refactor system objects
Moved into a separate file getSystemObject renamed to headSystemObject, implemented getSystemObject for system objects with payload Refactor putSystemObjects Moved systemCacheKey from data system_object Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
0460225869
commit
ccf5db95a5
5 changed files with 233 additions and 151 deletions
|
@ -41,11 +41,6 @@ type (
|
|||
// SettingsObjectName is system name for bucket settings file.
|
||||
func (b *BucketInfo) SettingsObjectName() string { return bktVersionSettingsObject }
|
||||
|
||||
// SystemObjectKey is key to use in SystemCache.
|
||||
func (b *BucketInfo) SystemObjectKey(obj string) string {
|
||||
return b.Name + obj
|
||||
}
|
||||
|
||||
// Version returns object version from ObjectInfo.
|
||||
func (o *ObjectInfo) Version() string { return o.ID.String() }
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -122,6 +121,16 @@ type (
|
|||
DeleteBucketParams struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
// PutSystemObjectParams stores putSystemObject parameters.
|
||||
PutSystemObjectParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
ObjName string
|
||||
Metadata map[string]string
|
||||
Prefix string
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
// ListObjectVersionsParams stores list objects versions parameters.
|
||||
ListObjectVersionsParams struct {
|
||||
Bucket string
|
||||
|
@ -381,7 +390,7 @@ func (n *layer) GetObjectTagging(ctx context.Context, oi *data.ObjectInfo) (map[
|
|||
Owner: oi.Owner,
|
||||
}
|
||||
|
||||
objInfo, err := n.getSystemObject(ctx, bktInfo, oi.TagsObject())
|
||||
objInfo, err := n.headSystemObject(ctx, bktInfo, oi.TagsObject())
|
||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -396,7 +405,8 @@ func (n *layer) GetBucketTagging(ctx context.Context, bucketName string) (map[st
|
|||
return nil, err
|
||||
}
|
||||
|
||||
objInfo, err := n.getSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName))
|
||||
objInfo, err := n.headSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName))
|
||||
|
||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -428,7 +438,15 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *PutTaggingParams) error
|
|||
Owner: p.ObjectInfo.Owner,
|
||||
}
|
||||
|
||||
if _, err := n.putSystemObject(ctx, bktInfo, p.ObjectInfo.TagsObject(), p.TagSet, tagPrefix); err != nil {
|
||||
s := &PutSystemObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
ObjName: p.ObjectInfo.TagsObject(),
|
||||
Metadata: p.TagSet,
|
||||
Prefix: tagPrefix,
|
||||
Payload: nil,
|
||||
}
|
||||
|
||||
if _, err := n.putSystemObject(ctx, s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -442,7 +460,15 @@ func (n *layer) PutBucketTagging(ctx context.Context, bucketName string, tagSet
|
|||
return err
|
||||
}
|
||||
|
||||
if _, err = n.putSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName), tagSet, tagPrefix); err != nil {
|
||||
s := &PutSystemObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
ObjName: formBucketTagObjectName(bucketName),
|
||||
Metadata: tagSet,
|
||||
Prefix: tagPrefix,
|
||||
Payload: nil,
|
||||
}
|
||||
|
||||
if _, err = n.putSystemObject(ctx, s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -458,22 +484,6 @@ func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectInfo) err
|
|||
return n.deleteSystemObject(ctx, bktInfo, p.TagsObject())
|
||||
}
|
||||
|
||||
func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
|
||||
ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
n.systemCache.Delete(bktInfo.SystemObjectKey(name))
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBucketTagging from storage.
|
||||
func (n *layer) DeleteBucketTagging(ctx context.Context, bucketName string) error {
|
||||
bktInfo, err := n.GetBucketInfo(ctx, bucketName)
|
||||
|
@ -484,83 +494,6 @@ func (n *layer) DeleteBucketTagging(ctx context.Context, bucketName string) erro
|
|||
return n.deleteSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName))
|
||||
}
|
||||
|
||||
func (n *layer) putSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, metadata map[string]string, prefix string) (*object.Object, error) {
|
||||
versions, err := n.headSystemVersions(ctx, bktInfo, objName)
|
||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||
return nil, err
|
||||
}
|
||||
idsToDeleteArr := updateCRDT2PSetHeaders(metadata, versions, false) // false means "last write wins"
|
||||
|
||||
attributes := make([]*object.Attribute, 0, 3)
|
||||
|
||||
filename := object.NewAttribute()
|
||||
filename.SetKey(objectSystemAttributeName)
|
||||
filename.SetValue(objName)
|
||||
|
||||
createdAt := object.NewAttribute()
|
||||
createdAt.SetKey(object.AttributeTimestamp)
|
||||
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
|
||||
|
||||
versioningIgnore := object.NewAttribute()
|
||||
versioningIgnore.SetKey(attrVersionsIgnore)
|
||||
versioningIgnore.SetValue(strconv.FormatBool(true))
|
||||
|
||||
attributes = append(attributes, filename, createdAt, versioningIgnore)
|
||||
|
||||
for k, v := range metadata {
|
||||
attr := object.NewAttribute()
|
||||
attr.SetKey(prefix + k)
|
||||
if prefix == tagPrefix && v == "" {
|
||||
v = tagEmptyMark
|
||||
}
|
||||
attr.SetValue(v)
|
||||
attributes = append(attributes, attr)
|
||||
}
|
||||
|
||||
raw := object.NewRaw()
|
||||
raw.SetOwnerID(bktInfo.Owner)
|
||||
raw.SetContainerID(bktInfo.CID)
|
||||
raw.SetAttributes(attributes...)
|
||||
|
||||
ops := new(client.PutObjectParams).WithObject(raw.Object())
|
||||
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta, err := n.objectHead(ctx, bktInfo.CID, oid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = n.systemCache.Put(bktInfo.SystemObjectKey(objName), meta); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
for _, id := range idsToDeleteArr {
|
||||
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
|
||||
n.log.Warn("couldn't delete system object",
|
||||
zap.Stringer("version id", id),
|
||||
zap.String("name", objName),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
func (n *layer) getSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) {
|
||||
if meta := n.systemCache.Get(bkt.SystemObjectKey(objName)); meta != nil {
|
||||
return objInfoFromMeta(bkt, meta), nil
|
||||
}
|
||||
|
||||
versions, err := n.headSystemVersions(ctx, bkt, objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return versions.getLast(), nil
|
||||
}
|
||||
|
||||
// CopyObject from one bucket into another bucket.
|
||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error) {
|
||||
pr, pw := io.Pipe()
|
||||
|
|
|
@ -326,7 +326,7 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa
|
|||
zap.Error(err))
|
||||
}
|
||||
|
||||
if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil {
|
||||
if oi := objInfoFromMeta(bkt, meta); oi != nil {
|
||||
if isSystem(oi) {
|
||||
continue
|
||||
}
|
||||
|
@ -337,50 +337,6 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa
|
|||
return versions, nil
|
||||
}
|
||||
|
||||
func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) {
|
||||
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// should be changed when system cache will store payload instead of meta
|
||||
metas := make(map[string]*object.Object, len(ids))
|
||||
|
||||
versions := newObjectVersions(sysName)
|
||||
for _, id := range ids {
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head object",
|
||||
zap.Stringer("object id", id),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil {
|
||||
if !isSystem(oi) {
|
||||
continue
|
||||
}
|
||||
versions.appendVersion(oi)
|
||||
metas[oi.Version()] = meta
|
||||
}
|
||||
}
|
||||
|
||||
lastVersion := versions.getLast()
|
||||
if lastVersion == nil {
|
||||
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
if err = n.systemCache.Put(bkt.SystemObjectKey(sysName), metas[lastVersion.Version()]); err != nil {
|
||||
n.log.Warn("couldn't put system meta to objects cache",
|
||||
zap.Stringer("object id", lastVersion.ID),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID string) (*data.ObjectInfo, error) {
|
||||
oid := object.NewID()
|
||||
if err := oid.Parse(versionID); err != nil {
|
||||
|
@ -399,7 +355,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID
|
|||
return nil, err
|
||||
}
|
||||
|
||||
objInfo := objectInfoFromMeta(bkt, meta, "", "")
|
||||
objInfo := objInfoFromMeta(bkt, meta)
|
||||
if err = n.objCache.Put(*meta); err != nil {
|
||||
n.log.Warn("couldn't put obj to object cache",
|
||||
zap.String("bucket name", objInfo.Bucket),
|
||||
|
|
190
api/layer/system_object.go
Normal file
190
api/layer/system_object.go
Normal file
|
@ -0,0 +1,190 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (*object.Object, error) {
|
||||
versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName)
|
||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||
return nil, err
|
||||
}
|
||||
idsToDeleteArr := updateCRDT2PSetHeaders(p.Metadata, versions, false) // false means "last write wins"
|
||||
|
||||
attributes := make([]*object.Attribute, 0, 3)
|
||||
|
||||
filename := object.NewAttribute()
|
||||
filename.SetKey(objectSystemAttributeName)
|
||||
filename.SetValue(p.ObjName)
|
||||
|
||||
createdAt := object.NewAttribute()
|
||||
createdAt.SetKey(object.AttributeTimestamp)
|
||||
createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
|
||||
|
||||
versioningIgnore := object.NewAttribute()
|
||||
versioningIgnore.SetKey(attrVersionsIgnore)
|
||||
versioningIgnore.SetValue(strconv.FormatBool(true))
|
||||
|
||||
attributes = append(attributes, filename, createdAt, versioningIgnore)
|
||||
|
||||
for k, v := range p.Metadata {
|
||||
attr := object.NewAttribute()
|
||||
attr.SetKey(p.Prefix + k)
|
||||
if p.Prefix == tagPrefix && v == "" {
|
||||
v = tagEmptyMark
|
||||
}
|
||||
attr.SetValue(v)
|
||||
attributes = append(attributes, attr)
|
||||
}
|
||||
|
||||
raw := object.NewRaw()
|
||||
raw.SetOwnerID(p.BktInfo.Owner)
|
||||
raw.SetContainerID(p.BktInfo.CID)
|
||||
raw.SetAttributes(attributes...)
|
||||
|
||||
ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(bytes.NewReader(p.Payload))
|
||||
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta, err := n.objectHead(ctx, p.BktInfo.CID, oid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p.Payload != nil {
|
||||
meta.ToV2().SetPayload(p.Payload)
|
||||
}
|
||||
|
||||
if err = n.systemCache.Put(systemObjectKey(p.BktInfo, p.ObjName), meta); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
for _, id := range idsToDeleteArr {
|
||||
if err = n.objectDelete(ctx, p.BktInfo.CID, id); err != nil {
|
||||
n.log.Warn("couldn't delete system object",
|
||||
zap.Stringer("version id", id),
|
||||
zap.String("name", p.ObjName),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) {
|
||||
if meta := n.systemCache.Get(systemObjectKey(bkt, objName)); meta != nil {
|
||||
return objInfoFromMeta(bkt, meta), nil
|
||||
}
|
||||
|
||||
versions, err := n.headSystemVersions(ctx, bkt, objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return versions.getLast(), nil
|
||||
}
|
||||
|
||||
func (n *layer) getSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string) (*object.Object, error) {
|
||||
if meta := n.systemCache.Get(systemObjectKey(bktInfo, objName)); meta != nil {
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
versions, err := n.headSystemVersions(ctx, bktInfo, objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objInfo := versions.getLast()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
p := &getParams{
|
||||
Writer: buf,
|
||||
cid: bktInfo.CID,
|
||||
oid: objInfo.ID,
|
||||
}
|
||||
|
||||
obj, err := n.objectGet(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj.ToV2().SetPayload(buf.Bytes())
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
|
||||
ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
n.systemCache.Delete(systemObjectKey(bktInfo, name))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) {
|
||||
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// should be changed when system cache will store payload instead of meta
|
||||
metas := make(map[string]*object.Object, len(ids))
|
||||
|
||||
versions := newObjectVersions(sysName)
|
||||
for _, id := range ids {
|
||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head object",
|
||||
zap.Stringer("object id", id),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if oi := objInfoFromMeta(bkt, meta); oi != nil {
|
||||
if !isSystem(oi) {
|
||||
continue
|
||||
}
|
||||
versions.appendVersion(oi)
|
||||
metas[oi.Version()] = meta
|
||||
}
|
||||
}
|
||||
|
||||
lastVersion := versions.getLast()
|
||||
if lastVersion == nil {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
if err = n.systemCache.Put(systemObjectKey(bkt, sysName), metas[lastVersion.Version()]); err != nil {
|
||||
n.log.Warn("couldn't put system meta to objects cache",
|
||||
zap.Stringer("object id", lastVersion.ID),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
// systemObjectKey is a key to use in SystemCache.
|
||||
func systemObjectKey(bktInfo *data.BucketInfo, obj string) string {
|
||||
return bktInfo.Name + obj
|
||||
}
|
|
@ -248,7 +248,15 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams)
|
|||
attrSettingsVersioningEnabled: strconv.FormatBool(p.Settings.VersioningEnabled),
|
||||
}
|
||||
|
||||
meta, err := n.putSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName(), metadata, "")
|
||||
s := &PutSystemObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
ObjName: bktInfo.SettingsObjectName(),
|
||||
Metadata: metadata,
|
||||
Prefix: "",
|
||||
Payload: nil,
|
||||
}
|
||||
|
||||
meta, err := n.putSystemObject(ctx, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -352,7 +360,7 @@ func contains(list []string, elem string) bool {
|
|||
}
|
||||
|
||||
func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*BucketSettings, error) {
|
||||
objInfo, err := n.getSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName())
|
||||
objInfo, err := n.headSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue