From ccf5db95a582c25af0b81036ba9639ee36b5a71b Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Mon, 4 Oct 2021 17:30:38 +0300 Subject: [PATCH] [#217] Refactor system objects Moved into a separate file getSystemObject renamed to headSystemObject, implemented getSystemObject for system objects with payload Refactor putSystemObjects Moved systemCacheKey from data system_object Signed-off-by: Angira Kekteeva --- api/data/info.go | 5 - api/layer/layer.go | 129 ++++++------------------- api/layer/object.go | 48 +--------- api/layer/system_object.go | 190 +++++++++++++++++++++++++++++++++++++ api/layer/versioning.go | 12 ++- 5 files changed, 233 insertions(+), 151 deletions(-) create mode 100644 api/layer/system_object.go diff --git a/api/data/info.go b/api/data/info.go index dc3b5a402..fe772f9b1 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -41,11 +41,6 @@ type ( // SettingsObjectName is system name for bucket settings file. func (b *BucketInfo) SettingsObjectName() string { return bktVersionSettingsObject } -// SystemObjectKey is key to use in SystemCache. -func (b *BucketInfo) SystemObjectKey(obj string) string { - return b.Name + obj -} - // Version returns object version from ObjectInfo. func (o *ObjectInfo) Version() string { return o.ID.String() } diff --git a/api/layer/layer.go b/api/layer/layer.go index 500eb4a32..147a67624 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "net/url" - "strconv" "strings" "time" @@ -122,6 +121,16 @@ type ( DeleteBucketParams struct { Name string } + + // PutSystemObjectParams stores putSystemObject parameters. + PutSystemObjectParams struct { + BktInfo *data.BucketInfo + ObjName string + Metadata map[string]string + Prefix string + Payload []byte + } + // ListObjectVersionsParams stores list objects versions parameters. ListObjectVersionsParams struct { Bucket string @@ -381,7 +390,7 @@ func (n *layer) GetObjectTagging(ctx context.Context, oi *data.ObjectInfo) (map[ Owner: oi.Owner, } - objInfo, err := n.getSystemObject(ctx, bktInfo, oi.TagsObject()) + objInfo, err := n.headSystemObject(ctx, bktInfo, oi.TagsObject()) if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { return nil, err } @@ -396,7 +405,8 @@ func (n *layer) GetBucketTagging(ctx context.Context, bucketName string) (map[st return nil, err } - objInfo, err := n.getSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName)) + objInfo, err := n.headSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName)) + if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { return nil, err } @@ -428,7 +438,15 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *PutTaggingParams) error Owner: p.ObjectInfo.Owner, } - if _, err := n.putSystemObject(ctx, bktInfo, p.ObjectInfo.TagsObject(), p.TagSet, tagPrefix); err != nil { + s := &PutSystemObjectParams{ + BktInfo: bktInfo, + ObjName: p.ObjectInfo.TagsObject(), + Metadata: p.TagSet, + Prefix: tagPrefix, + Payload: nil, + } + + if _, err := n.putSystemObject(ctx, s); err != nil { return err } @@ -442,7 +460,15 @@ func (n *layer) PutBucketTagging(ctx context.Context, bucketName string, tagSet return err } - if _, err = n.putSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName), tagSet, tagPrefix); err != nil { + s := &PutSystemObjectParams{ + BktInfo: bktInfo, + ObjName: formBucketTagObjectName(bucketName), + Metadata: tagSet, + Prefix: tagPrefix, + Payload: nil, + } + + if _, err = n.putSystemObject(ctx, s); err != nil { return err } @@ -458,22 +484,6 @@ func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectInfo) err return n.deleteSystemObject(ctx, bktInfo, p.TagsObject()) } -func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { - ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) - if err != nil { - return err - } - - for _, id := range ids { - if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { - return err - } - } - - n.systemCache.Delete(bktInfo.SystemObjectKey(name)) - return nil -} - // DeleteBucketTagging from storage. func (n *layer) DeleteBucketTagging(ctx context.Context, bucketName string) error { bktInfo, err := n.GetBucketInfo(ctx, bucketName) @@ -484,83 +494,6 @@ func (n *layer) DeleteBucketTagging(ctx context.Context, bucketName string) erro return n.deleteSystemObject(ctx, bktInfo, formBucketTagObjectName(bucketName)) } -func (n *layer) putSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, metadata map[string]string, prefix string) (*object.Object, error) { - versions, err := n.headSystemVersions(ctx, bktInfo, objName) - if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { - return nil, err - } - idsToDeleteArr := updateCRDT2PSetHeaders(metadata, versions, false) // false means "last write wins" - - attributes := make([]*object.Attribute, 0, 3) - - filename := object.NewAttribute() - filename.SetKey(objectSystemAttributeName) - filename.SetValue(objName) - - createdAt := object.NewAttribute() - createdAt.SetKey(object.AttributeTimestamp) - createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) - - versioningIgnore := object.NewAttribute() - versioningIgnore.SetKey(attrVersionsIgnore) - versioningIgnore.SetValue(strconv.FormatBool(true)) - - attributes = append(attributes, filename, createdAt, versioningIgnore) - - for k, v := range metadata { - attr := object.NewAttribute() - attr.SetKey(prefix + k) - if prefix == tagPrefix && v == "" { - v = tagEmptyMark - } - attr.SetValue(v) - attributes = append(attributes, attr) - } - - raw := object.NewRaw() - raw.SetOwnerID(bktInfo.Owner) - raw.SetContainerID(bktInfo.CID) - raw.SetAttributes(attributes...) - - ops := new(client.PutObjectParams).WithObject(raw.Object()) - oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) - if err != nil { - return nil, err - } - - meta, err := n.objectHead(ctx, bktInfo.CID, oid) - if err != nil { - return nil, err - } - if err = n.systemCache.Put(bktInfo.SystemObjectKey(objName), meta); err != nil { - n.log.Error("couldn't cache system object", zap.Error(err)) - } - - for _, id := range idsToDeleteArr { - if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { - n.log.Warn("couldn't delete system object", - zap.Stringer("version id", id), - zap.String("name", objName), - zap.Error(err)) - } - } - - return meta, nil -} - -func (n *layer) getSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) { - if meta := n.systemCache.Get(bkt.SystemObjectKey(objName)); meta != nil { - return objInfoFromMeta(bkt, meta), nil - } - - versions, err := n.headSystemVersions(ctx, bkt, objName) - if err != nil { - return nil, err - } - - return versions.getLast(), nil -} - // CopyObject from one bucket into another bucket. func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error) { pr, pw := io.Pipe() diff --git a/api/layer/object.go b/api/layer/object.go index c7fb3ee08..2a60c4601 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -326,7 +326,7 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa zap.Error(err)) } - if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil { + if oi := objInfoFromMeta(bkt, meta); oi != nil { if isSystem(oi) { continue } @@ -337,50 +337,6 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa return versions, nil } -func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) { - ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName}) - if err != nil { - return nil, err - } - - // should be changed when system cache will store payload instead of meta - metas := make(map[string]*object.Object, len(ids)) - - versions := newObjectVersions(sysName) - for _, id := range ids { - meta, err := n.objectHead(ctx, bkt.CID, id) - if err != nil { - n.log.Warn("couldn't head object", - zap.Stringer("object id", id), - zap.Stringer("bucket id", bkt.CID), - zap.Error(err)) - continue - } - - if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil { - if !isSystem(oi) { - continue - } - versions.appendVersion(oi) - metas[oi.Version()] = meta - } - } - - lastVersion := versions.getLast() - if lastVersion == nil { - return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) - } - - if err = n.systemCache.Put(bkt.SystemObjectKey(sysName), metas[lastVersion.Version()]); err != nil { - n.log.Warn("couldn't put system meta to objects cache", - zap.Stringer("object id", lastVersion.ID), - zap.Stringer("bucket id", bkt.CID), - zap.Error(err)) - } - - return versions, nil -} - func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID string) (*data.ObjectInfo, error) { oid := object.NewID() if err := oid.Parse(versionID); err != nil { @@ -399,7 +355,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID return nil, err } - objInfo := objectInfoFromMeta(bkt, meta, "", "") + objInfo := objInfoFromMeta(bkt, meta) if err = n.objCache.Put(*meta); err != nil { n.log.Warn("couldn't put obj to object cache", zap.String("bucket name", objInfo.Bucket), diff --git a/api/layer/system_object.go b/api/layer/system_object.go new file mode 100644 index 000000000..f9803d27a --- /dev/null +++ b/api/layer/system_object.go @@ -0,0 +1,190 @@ +package layer + +import ( + "bytes" + "context" + "strconv" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-s3-gw/api/data" + "github.com/nspcc-dev/neofs-s3-gw/api/errors" + "go.uber.org/zap" +) + +func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (*object.Object, error) { + versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName) + if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { + return nil, err + } + idsToDeleteArr := updateCRDT2PSetHeaders(p.Metadata, versions, false) // false means "last write wins" + + attributes := make([]*object.Attribute, 0, 3) + + filename := object.NewAttribute() + filename.SetKey(objectSystemAttributeName) + filename.SetValue(p.ObjName) + + createdAt := object.NewAttribute() + createdAt.SetKey(object.AttributeTimestamp) + createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) + + versioningIgnore := object.NewAttribute() + versioningIgnore.SetKey(attrVersionsIgnore) + versioningIgnore.SetValue(strconv.FormatBool(true)) + + attributes = append(attributes, filename, createdAt, versioningIgnore) + + for k, v := range p.Metadata { + attr := object.NewAttribute() + attr.SetKey(p.Prefix + k) + if p.Prefix == tagPrefix && v == "" { + v = tagEmptyMark + } + attr.SetValue(v) + attributes = append(attributes, attr) + } + + raw := object.NewRaw() + raw.SetOwnerID(p.BktInfo.Owner) + raw.SetContainerID(p.BktInfo.CID) + raw.SetAttributes(attributes...) + + ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(bytes.NewReader(p.Payload)) + oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) + if err != nil { + return nil, err + } + + meta, err := n.objectHead(ctx, p.BktInfo.CID, oid) + if err != nil { + return nil, err + } + + if p.Payload != nil { + meta.ToV2().SetPayload(p.Payload) + } + + if err = n.systemCache.Put(systemObjectKey(p.BktInfo, p.ObjName), meta); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + for _, id := range idsToDeleteArr { + if err = n.objectDelete(ctx, p.BktInfo.CID, id); err != nil { + n.log.Warn("couldn't delete system object", + zap.Stringer("version id", id), + zap.String("name", p.ObjName), + zap.Error(err)) + } + } + + return meta, nil +} + +func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) { + if meta := n.systemCache.Get(systemObjectKey(bkt, objName)); meta != nil { + return objInfoFromMeta(bkt, meta), nil + } + + versions, err := n.headSystemVersions(ctx, bkt, objName) + if err != nil { + return nil, err + } + + return versions.getLast(), nil +} + +func (n *layer) getSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string) (*object.Object, error) { + if meta := n.systemCache.Get(systemObjectKey(bktInfo, objName)); meta != nil { + return meta, nil + } + + versions, err := n.headSystemVersions(ctx, bktInfo, objName) + if err != nil { + return nil, err + } + + objInfo := versions.getLast() + + buf := new(bytes.Buffer) + p := &getParams{ + Writer: buf, + cid: bktInfo.CID, + oid: objInfo.ID, + } + + obj, err := n.objectGet(ctx, p) + if err != nil { + return nil, err + } + + obj.ToV2().SetPayload(buf.Bytes()) + + return obj, nil +} + +func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { + ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) + if err != nil { + return err + } + + for _, id := range ids { + if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { + return err + } + } + + n.systemCache.Delete(systemObjectKey(bktInfo, name)) + return nil +} + +func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) { + ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName}) + if err != nil { + return nil, err + } + + // should be changed when system cache will store payload instead of meta + metas := make(map[string]*object.Object, len(ids)) + + versions := newObjectVersions(sysName) + for _, id := range ids { + meta, err := n.objectHead(ctx, bkt.CID, id) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("object id", id), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + continue + } + + if oi := objInfoFromMeta(bkt, meta); oi != nil { + if !isSystem(oi) { + continue + } + versions.appendVersion(oi) + metas[oi.Version()] = meta + } + } + + lastVersion := versions.getLast() + if lastVersion == nil { + return nil, errors.GetAPIError(errors.ErrNoSuchKey) + } + + if err = n.systemCache.Put(systemObjectKey(bkt, sysName), metas[lastVersion.Version()]); err != nil { + n.log.Warn("couldn't put system meta to objects cache", + zap.Stringer("object id", lastVersion.ID), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + } + + return versions, nil +} + +// systemObjectKey is a key to use in SystemCache. +func systemObjectKey(bktInfo *data.BucketInfo, obj string) string { + return bktInfo.Name + obj +} diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 9f3b91006..73fe96a04 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -248,7 +248,15 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) attrSettingsVersioningEnabled: strconv.FormatBool(p.Settings.VersioningEnabled), } - meta, err := n.putSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName(), metadata, "") + s := &PutSystemObjectParams{ + BktInfo: bktInfo, + ObjName: bktInfo.SettingsObjectName(), + Metadata: metadata, + Prefix: "", + Payload: nil, + } + + meta, err := n.putSystemObject(ctx, s) if err != nil { return nil, err } @@ -352,7 +360,7 @@ func contains(list []string, elem string) bool { } func (n *layer) getBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*BucketSettings, error) { - objInfo, err := n.getSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName()) + objInfo, err := n.headSystemObject(ctx, bktInfo, bktInfo.SettingsObjectName()) if err != nil { return nil, err }