From 3231ecab03d1a4361f1be2e7cbea300f86180ced Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 21 Sep 2021 16:08:06 +0300 Subject: [PATCH] [#263] Add LWW to system objects Signed-off-by: Denis Kirillov --- api/layer/layer.go | 56 ++++++-------- api/layer/object.go | 81 +++++++++++++------- api/layer/versioning.go | 6 +- api/layer/versioning_test.go | 142 ++++++++++++++++++++++++++++++++++- 4 files changed, 221 insertions(+), 64 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index b04ae1b..500eb4a 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -459,22 +459,19 @@ func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectInfo) err } func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { - var oid *object.ID - if meta := n.systemCache.Get(bktInfo.SystemObjectKey(name)); meta != nil { - oid = meta.ID() - } else { - var err error - oid, err = n.objectFindID(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) - if err != nil { - if errors.IsS3Error(err, errors.ErrNoSuchKey) { - return nil - } + ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) + if err != nil { + return err + } + + for _, id := range ids { + if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { return err } } n.systemCache.Delete(bktInfo.SystemObjectKey(name)) - return n.objectDelete(ctx, bktInfo.CID, oid) + return nil } // DeleteBucketTagging from storage. @@ -488,18 +485,11 @@ func (n *layer) DeleteBucketTagging(ctx context.Context, bucketName string) erro } func (n *layer) putSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, metadata map[string]string, prefix string) (*object.Object, error) { - var ( - err error - oldOID *object.ID - ) - if meta := n.systemCache.Get(bktInfo.SystemObjectKey(objName)); meta != nil { - oldOID = meta.ID() - } else { - oldOID, err = n.objectFindID(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: objName}) - if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { - return nil, err - } + versions, err := n.headSystemVersions(ctx, bktInfo, objName) + if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { + return nil, err } + idsToDeleteArr := updateCRDT2PSetHeaders(metadata, versions, false) // false means "last write wins" attributes := make([]*object.Attribute, 0, 3) @@ -545,9 +535,13 @@ func (n *layer) putSystemObject(ctx context.Context, bktInfo *data.BucketInfo, o if err = n.systemCache.Put(bktInfo.SystemObjectKey(objName), meta); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } - if oldOID != nil { - if err = n.objectDelete(ctx, bktInfo.CID, oldOID); err != nil { - return nil, err + + for _, id := range idsToDeleteArr { + if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { + n.log.Warn("couldn't delete system object", + zap.Stringer("version id", id), + zap.String("name", objName), + zap.Error(err)) } } @@ -559,20 +553,12 @@ func (n *layer) getSystemObject(ctx context.Context, bkt *data.BucketInfo, objNa return objInfoFromMeta(bkt, meta), nil } - oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: objName}) + versions, err := n.headSystemVersions(ctx, bkt, objName) if err != nil { return nil, err } - meta, err := n.objectHead(ctx, bkt.CID, oid) - if err != nil { - return nil, err - } - if err = n.systemCache.Put(bkt.SystemObjectKey(objName), meta); err != nil { - n.log.Error("couldn't cache system object", zap.Error(err)) - } - - return objInfoFromMeta(bkt, meta), nil + return versions.getLast(), nil } // CopyObject from one bucket into another bucket. diff --git a/api/layer/object.go b/api/layer/object.go index 925c192..c7fb3ee 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -2,7 +2,6 @@ package layer import ( "context" - "errors" "io" "net/url" "sort" @@ -92,20 +91,6 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx)) } -// objectFindID returns object id (uuid) based on it's nice name in s3. If -// nice name is uuid compatible, then function returns it. -func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, error) { - if result, err := n.objectSearch(ctx, p); err != nil { - return nil, err - } else if ln := len(result); ln == 0 { - return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) - } else if ln == 1 { - return result[0], nil - } - - return nil, errors.New("several objects with the same name found") -} - func newAddress(cid *cid.ID, oid *object.ID) *object.Address { address := object.NewAddress() address.SetContainerID(cid) @@ -147,7 +132,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) { return nil, err } - idsToDeleteArr := updateCRDT2PSetHeaders(p, versions, versioningEnabled) + idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled) r := p.Reader if len(p.Header[api.ContentType]) == 0 { @@ -243,27 +228,27 @@ func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string) return raw } -func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versioningEnabled bool) []*object.ID { +func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*object.ID { var idsToDeleteArr []*object.ID - if versions == nil { + if versions.isEmpty() { return idsToDeleteArr } if versioningEnabled { if !versions.isAddListEmpty() { - p.Header[versionsAddAttr] = versions.getAddHeader() + header[versionsAddAttr] = versions.getAddHeader() } deleted := versions.getDelHeader() - // p.Header[versionsDelAttr] can be not empty when deleting specific version - if delAttr := p.Header[versionsDelAttr]; len(delAttr) != 0 { + // header[versionsDelAttr] can be not empty when deleting specific version + if delAttr := header[versionsDelAttr]; len(delAttr) != 0 { if len(deleted) != 0 { - p.Header[versionsDelAttr] = deleted + "," + delAttr + header[versionsDelAttr] = deleted + "," + delAttr } else { - p.Header[versionsDelAttr] = delAttr + header[versionsDelAttr] = delAttr } } else if len(deleted) != 0 { - p.Header[versionsDelAttr] = deleted + header[versionsDelAttr] = deleted } } else { versionsDeletedStr := versions.getDelHeader() @@ -272,10 +257,10 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio } if lastVersion := versions.getLast(); lastVersion != nil { - p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version() + header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version() idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID) } else if len(versionsDeletedStr) != 0 { - p.Header[versionsDelAttr] = versionsDeletedStr + header[versionsDelAttr] = versionsDeletedStr } for _, version := range versions.objects { @@ -352,6 +337,50 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa return versions, nil } +func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) { + ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName}) + if err != nil { + return nil, err + } + + // should be changed when system cache will store payload instead of meta + metas := make(map[string]*object.Object, len(ids)) + + versions := newObjectVersions(sysName) + for _, id := range ids { + meta, err := n.objectHead(ctx, bkt.CID, id) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("object id", id), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + continue + } + + if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil { + if !isSystem(oi) { + continue + } + versions.appendVersion(oi) + metas[oi.Version()] = meta + } + } + + lastVersion := versions.getLast() + if lastVersion == nil { + return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) + } + + if err = n.systemCache.Put(bkt.SystemObjectKey(sysName), metas[lastVersion.Version()]); err != nil { + n.log.Warn("couldn't put system meta to objects cache", + zap.Stringer("object id", lastVersion.ID), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + } + + return versions, nil +} + func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID string) (*data.ObjectInfo, error) { oid := object.NewID() if err := oid.Parse(versionID); err != nil { diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 400f06f..9f3b910 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -156,8 +156,12 @@ LOOP: return commonAddedVersions, prevVersions, currentVersions } +func (v *objectVersions) isEmpty() bool { + return v == nil || len(v.objects) == 0 +} + func (v *objectVersions) getLast() *data.ObjectInfo { - if v == nil || len(v.objects) == 0 { + if v.isEmpty() { return nil } diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index ffe1430..dba658d 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -299,6 +299,17 @@ func (tc *testContext) checkListObjects(ids ...*object.ID) { } } +func (tc *testContext) getSystemObject(objectName string) *object.Object { + for _, obj := range tc.testPool.objects { + for _, attr := range obj.Attributes() { + if attr.Key() == objectSystemAttributeName && attr.Value() == objectName { + return obj + } + } + } + return nil +} + type testContext struct { t *testing.T ctx context.Context @@ -309,7 +320,7 @@ type testContext struct { testPool *testPool } -func prepareContext(t *testing.T) *testContext { +func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { key, err := keys.NewPrivateKey() require.NoError(t, err) @@ -328,9 +339,14 @@ func prepareContext(t *testing.T) *testContext { bktID, err := tp.PutContainer(ctx, cnr) require.NoError(t, err) + config := DefaultCachesConfigs() + if len(cachesConfig) != 0 { + config = cachesConfig[0] + } + return &testContext{ ctx: ctx, - layer: NewLayer(l, tp, DefaultCachesConfigs()), + layer: NewLayer(l, tp, config), bkt: bktName, bktID: bktID, obj: "obj1", @@ -662,3 +678,125 @@ func getTestObjectInfoEpoch(epoch uint64, id byte, addAttr, delAttr, delMarkAttr obj.CreationEpoch = epoch return obj } + +func TestUpdateCRDT2PSetHeaders(t *testing.T) { + obj1 := getTestObjectInfo(1, "", "", "") + obj2 := getTestObjectInfo(2, "", "", "") + + for _, tc := range []struct { + header map[string]string + versions *objectVersions + versioningEnabled bool + expectedHeader map[string]string + expectedIdsToDelete []*object.ID + }{ + { + header: map[string]string{"someKey": "someValue"}, + expectedHeader: map[string]string{"someKey": "someValue"}, + expectedIdsToDelete: nil, + }, + { + header: map[string]string{}, + versions: &objectVersions{ + objects: []*data.ObjectInfo{obj1}, + }, + expectedHeader: map[string]string{versionsDelAttr: obj1.Version()}, + expectedIdsToDelete: []*object.ID{obj1.ID}, + }, + { + header: map[string]string{}, + versions: &objectVersions{ + objects: []*data.ObjectInfo{obj2}, + delList: []string{obj1.Version()}, + }, + expectedHeader: map[string]string{versionsDelAttr: joinVers(obj1, obj2)}, + expectedIdsToDelete: []*object.ID{obj2.ID}, + }, + { + header: map[string]string{}, + versions: &objectVersions{ + objects: []*data.ObjectInfo{obj1}, + }, + versioningEnabled: true, + expectedHeader: map[string]string{versionsAddAttr: obj1.Version()}, + expectedIdsToDelete: nil, + }, + { + header: map[string]string{versionsDelAttr: obj2.Version()}, + versions: &objectVersions{ + objects: []*data.ObjectInfo{obj2}, + delList: []string{obj1.Version()}, + }, + versioningEnabled: true, + expectedHeader: map[string]string{ + versionsAddAttr: obj2.Version(), + versionsDelAttr: joinVers(obj1, obj2), + }, + expectedIdsToDelete: nil, + }, + } { + idsToDelete := updateCRDT2PSetHeaders(tc.header, tc.versions, tc.versioningEnabled) + require.Equal(t, tc.expectedHeader, tc.header) + require.Equal(t, tc.expectedIdsToDelete, idsToDelete) + } +} + +func TestSystemObjectsVersioning(t *testing.T) { + cacheConfig := DefaultCachesConfigs() + cacheConfig.System.Lifetime = 0 + + tc := prepareContext(t, cacheConfig) + objInfo, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ + Bucket: tc.bkt, + Settings: &BucketSettings{VersioningEnabled: false}, + }) + require.NoError(t, err) + + objMeta, ok := tc.testPool.objects[objInfo.Address().String()] + require.True(t, ok) + + _, err = tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{ + Bucket: tc.bkt, + Settings: &BucketSettings{VersioningEnabled: true}, + }) + require.NoError(t, err) + + // simulate failed deletion + tc.testPool.objects[objInfo.Address().String()] = objMeta + + versioning, err := tc.layer.GetBucketVersioning(tc.ctx, tc.bkt) + require.NoError(t, err) + require.True(t, versioning.VersioningEnabled) +} + +func TestDeleteSystemObjectsVersioning(t *testing.T) { + cacheConfig := DefaultCachesConfigs() + cacheConfig.System.Lifetime = 0 + + tc := prepareContext(t, cacheConfig) + + tagSet := map[string]string{ + "tag1": "val1", + } + + err := tc.layer.PutBucketTagging(tc.ctx, tc.bkt, tagSet) + require.NoError(t, err) + + objMeta := tc.getSystemObject(formBucketTagObjectName(tc.bkt)) + + tagSet["tag2"] = "val2" + err = tc.layer.PutBucketTagging(tc.ctx, tc.bkt, tagSet) + require.NoError(t, err) + + // simulate failed deletion + tc.testPool.objects[newAddress(objMeta.ContainerID(), objMeta.ID()).String()] = objMeta + + tagging, err := tc.layer.GetBucketTagging(tc.ctx, tc.bkt) + require.NoError(t, err) + require.Equal(t, tagSet, tagging) + + err = tc.layer.DeleteBucketTagging(tc.ctx, tc.bkt) + require.NoError(t, err) + + require.Nil(t, tc.getSystemObject(formBucketTagObjectName(tc.bkt))) +}