[#263] Add LWW to system objects

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-09-21 16:08:06 +03:00 committed by Alex Vanin
parent d616d9e2d9
commit 3231ecab03
4 changed files with 221 additions and 64 deletions

View file

@ -459,22 +459,19 @@ func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectInfo) err
}
func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
var oid *object.ID
if meta := n.systemCache.Get(bktInfo.SystemObjectKey(name)); meta != nil {
oid = meta.ID()
} else {
var err error
oid, err = n.objectFindID(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name})
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil
}
ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name})
if err != nil {
return err
}
for _, id := range ids {
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
return err
}
}
n.systemCache.Delete(bktInfo.SystemObjectKey(name))
return n.objectDelete(ctx, bktInfo.CID, oid)
return nil
}
// DeleteBucketTagging from storage.
@ -488,18 +485,11 @@ func (n *layer) DeleteBucketTagging(ctx context.Context, bucketName string) erro
}
func (n *layer) putSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, metadata map[string]string, prefix string) (*object.Object, error) {
var (
err error
oldOID *object.ID
)
if meta := n.systemCache.Get(bktInfo.SystemObjectKey(objName)); meta != nil {
oldOID = meta.ID()
} else {
oldOID, err = n.objectFindID(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: objName})
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil, err
}
versions, err := n.headSystemVersions(ctx, bktInfo, objName)
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil, err
}
idsToDeleteArr := updateCRDT2PSetHeaders(metadata, versions, false) // false means "last write wins"
attributes := make([]*object.Attribute, 0, 3)
@ -545,9 +535,13 @@ func (n *layer) putSystemObject(ctx context.Context, bktInfo *data.BucketInfo, o
if err = n.systemCache.Put(bktInfo.SystemObjectKey(objName), meta); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
if oldOID != nil {
if err = n.objectDelete(ctx, bktInfo.CID, oldOID); err != nil {
return nil, err
for _, id := range idsToDeleteArr {
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
n.log.Warn("couldn't delete system object",
zap.Stringer("version id", id),
zap.String("name", objName),
zap.Error(err))
}
}
@ -559,20 +553,12 @@ func (n *layer) getSystemObject(ctx context.Context, bkt *data.BucketInfo, objNa
return objInfoFromMeta(bkt, meta), nil
}
oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: objName})
versions, err := n.headSystemVersions(ctx, bkt, objName)
if err != nil {
return nil, err
}
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
return nil, err
}
if err = n.systemCache.Put(bkt.SystemObjectKey(objName), meta); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
return objInfoFromMeta(bkt, meta), nil
return versions.getLast(), nil
}
// CopyObject from one bucket into another bucket.

View file

@ -2,7 +2,6 @@ package layer
import (
"context"
"errors"
"io"
"net/url"
"sort"
@ -92,20 +91,6 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID,
return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx))
}
// objectFindID returns object id (uuid) based on it's nice name in s3. If
// nice name is uuid compatible, then function returns it.
func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, error) {
if result, err := n.objectSearch(ctx, p); err != nil {
return nil, err
} else if ln := len(result); ln == 0 {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
} else if ln == 1 {
return result[0], nil
}
return nil, errors.New("several objects with the same name found")
}
func newAddress(cid *cid.ID, oid *object.ID) *object.Address {
address := object.NewAddress()
address.SetContainerID(cid)
@ -147,7 +132,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) {
return nil, err
}
idsToDeleteArr := updateCRDT2PSetHeaders(p, versions, versioningEnabled)
idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled)
r := p.Reader
if len(p.Header[api.ContentType]) == 0 {
@ -243,27 +228,27 @@ func formRawObject(p *PutObjectParams, bktID *cid.ID, own *owner.ID, obj string)
return raw
}
func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versioningEnabled bool) []*object.ID {
func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []*object.ID {
var idsToDeleteArr []*object.ID
if versions == nil {
if versions.isEmpty() {
return idsToDeleteArr
}
if versioningEnabled {
if !versions.isAddListEmpty() {
p.Header[versionsAddAttr] = versions.getAddHeader()
header[versionsAddAttr] = versions.getAddHeader()
}
deleted := versions.getDelHeader()
// p.Header[versionsDelAttr] can be not empty when deleting specific version
if delAttr := p.Header[versionsDelAttr]; len(delAttr) != 0 {
// header[versionsDelAttr] can be not empty when deleting specific version
if delAttr := header[versionsDelAttr]; len(delAttr) != 0 {
if len(deleted) != 0 {
p.Header[versionsDelAttr] = deleted + "," + delAttr
header[versionsDelAttr] = deleted + "," + delAttr
} else {
p.Header[versionsDelAttr] = delAttr
header[versionsDelAttr] = delAttr
}
} else if len(deleted) != 0 {
p.Header[versionsDelAttr] = deleted
header[versionsDelAttr] = deleted
}
} else {
versionsDeletedStr := versions.getDelHeader()
@ -272,10 +257,10 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio
}
if lastVersion := versions.getLast(); lastVersion != nil {
p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version()
header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version()
idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID)
} else if len(versionsDeletedStr) != 0 {
p.Header[versionsDelAttr] = versionsDeletedStr
header[versionsDelAttr] = versionsDeletedStr
}
for _, version := range versions.objects {
@ -352,6 +337,50 @@ func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectNa
return versions, nil
}
func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName})
if err != nil {
return nil, err
}
// should be changed when system cache will store payload instead of meta
metas := make(map[string]*object.Object, len(ids))
versions := newObjectVersions(sysName)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
n.log.Warn("couldn't head object",
zap.Stringer("object id", id),
zap.Stringer("bucket id", bkt.CID),
zap.Error(err))
continue
}
if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil {
if !isSystem(oi) {
continue
}
versions.appendVersion(oi)
metas[oi.Version()] = meta
}
}
lastVersion := versions.getLast()
if lastVersion == nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
if err = n.systemCache.Put(bkt.SystemObjectKey(sysName), metas[lastVersion.Version()]); err != nil {
n.log.Warn("couldn't put system meta to objects cache",
zap.Stringer("object id", lastVersion.ID),
zap.Stringer("bucket id", bkt.CID),
zap.Error(err))
}
return versions, nil
}
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, versionID string) (*data.ObjectInfo, error) {
oid := object.NewID()
if err := oid.Parse(versionID); err != nil {

View file

@ -156,8 +156,12 @@ LOOP:
return commonAddedVersions, prevVersions, currentVersions
}
func (v *objectVersions) isEmpty() bool {
return v == nil || len(v.objects) == 0
}
func (v *objectVersions) getLast() *data.ObjectInfo {
if v == nil || len(v.objects) == 0 {
if v.isEmpty() {
return nil
}

View file

@ -299,6 +299,17 @@ func (tc *testContext) checkListObjects(ids ...*object.ID) {
}
}
func (tc *testContext) getSystemObject(objectName string) *object.Object {
for _, obj := range tc.testPool.objects {
for _, attr := range obj.Attributes() {
if attr.Key() == objectSystemAttributeName && attr.Value() == objectName {
return obj
}
}
}
return nil
}
type testContext struct {
t *testing.T
ctx context.Context
@ -309,7 +320,7 @@ type testContext struct {
testPool *testPool
}
func prepareContext(t *testing.T) *testContext {
func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
@ -328,9 +339,14 @@ func prepareContext(t *testing.T) *testContext {
bktID, err := tp.PutContainer(ctx, cnr)
require.NoError(t, err)
config := DefaultCachesConfigs()
if len(cachesConfig) != 0 {
config = cachesConfig[0]
}
return &testContext{
ctx: ctx,
layer: NewLayer(l, tp, DefaultCachesConfigs()),
layer: NewLayer(l, tp, config),
bkt: bktName,
bktID: bktID,
obj: "obj1",
@ -662,3 +678,125 @@ func getTestObjectInfoEpoch(epoch uint64, id byte, addAttr, delAttr, delMarkAttr
obj.CreationEpoch = epoch
return obj
}
func TestUpdateCRDT2PSetHeaders(t *testing.T) {
obj1 := getTestObjectInfo(1, "", "", "")
obj2 := getTestObjectInfo(2, "", "", "")
for _, tc := range []struct {
header map[string]string
versions *objectVersions
versioningEnabled bool
expectedHeader map[string]string
expectedIdsToDelete []*object.ID
}{
{
header: map[string]string{"someKey": "someValue"},
expectedHeader: map[string]string{"someKey": "someValue"},
expectedIdsToDelete: nil,
},
{
header: map[string]string{},
versions: &objectVersions{
objects: []*data.ObjectInfo{obj1},
},
expectedHeader: map[string]string{versionsDelAttr: obj1.Version()},
expectedIdsToDelete: []*object.ID{obj1.ID},
},
{
header: map[string]string{},
versions: &objectVersions{
objects: []*data.ObjectInfo{obj2},
delList: []string{obj1.Version()},
},
expectedHeader: map[string]string{versionsDelAttr: joinVers(obj1, obj2)},
expectedIdsToDelete: []*object.ID{obj2.ID},
},
{
header: map[string]string{},
versions: &objectVersions{
objects: []*data.ObjectInfo{obj1},
},
versioningEnabled: true,
expectedHeader: map[string]string{versionsAddAttr: obj1.Version()},
expectedIdsToDelete: nil,
},
{
header: map[string]string{versionsDelAttr: obj2.Version()},
versions: &objectVersions{
objects: []*data.ObjectInfo{obj2},
delList: []string{obj1.Version()},
},
versioningEnabled: true,
expectedHeader: map[string]string{
versionsAddAttr: obj2.Version(),
versionsDelAttr: joinVers(obj1, obj2),
},
expectedIdsToDelete: nil,
},
} {
idsToDelete := updateCRDT2PSetHeaders(tc.header, tc.versions, tc.versioningEnabled)
require.Equal(t, tc.expectedHeader, tc.header)
require.Equal(t, tc.expectedIdsToDelete, idsToDelete)
}
}
func TestSystemObjectsVersioning(t *testing.T) {
cacheConfig := DefaultCachesConfigs()
cacheConfig.System.Lifetime = 0
tc := prepareContext(t, cacheConfig)
objInfo, err := tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
Bucket: tc.bkt,
Settings: &BucketSettings{VersioningEnabled: false},
})
require.NoError(t, err)
objMeta, ok := tc.testPool.objects[objInfo.Address().String()]
require.True(t, ok)
_, err = tc.layer.PutBucketVersioning(tc.ctx, &PutVersioningParams{
Bucket: tc.bkt,
Settings: &BucketSettings{VersioningEnabled: true},
})
require.NoError(t, err)
// simulate failed deletion
tc.testPool.objects[objInfo.Address().String()] = objMeta
versioning, err := tc.layer.GetBucketVersioning(tc.ctx, tc.bkt)
require.NoError(t, err)
require.True(t, versioning.VersioningEnabled)
}
func TestDeleteSystemObjectsVersioning(t *testing.T) {
cacheConfig := DefaultCachesConfigs()
cacheConfig.System.Lifetime = 0
tc := prepareContext(t, cacheConfig)
tagSet := map[string]string{
"tag1": "val1",
}
err := tc.layer.PutBucketTagging(tc.ctx, tc.bkt, tagSet)
require.NoError(t, err)
objMeta := tc.getSystemObject(formBucketTagObjectName(tc.bkt))
tagSet["tag2"] = "val2"
err = tc.layer.PutBucketTagging(tc.ctx, tc.bkt, tagSet)
require.NoError(t, err)
// simulate failed deletion
tc.testPool.objects[newAddress(objMeta.ContainerID(), objMeta.ID()).String()] = objMeta
tagging, err := tc.layer.GetBucketTagging(tc.ctx, tc.bkt)
require.NoError(t, err)
require.Equal(t, tagSet, tagging)
err = tc.layer.DeleteBucketTagging(tc.ctx, tc.bkt)
require.NoError(t, err)
require.Nil(t, tc.getSystemObject(formBucketTagObjectName(tc.bkt)))
}