From 0590f84d684bae348797c223e5602dcf181281a5 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 13 Jun 2023 12:33:03 +0300 Subject: [PATCH] [#135] crdt: Add GSet Signed-off-by: Denis Kirillov --- internal/frostfs/crdt/gset.go | 124 +++++++++++++++++++++++++++++ internal/frostfs/crdt/gset_test.go | 38 +++++++++ 2 files changed, 162 insertions(+) create mode 100644 internal/frostfs/crdt/gset.go create mode 100644 internal/frostfs/crdt/gset_test.go diff --git a/internal/frostfs/crdt/gset.go b/internal/frostfs/crdt/gset.go new file mode 100644 index 0000000..977c7d7 --- /dev/null +++ b/internal/frostfs/crdt/gset.go @@ -0,0 +1,124 @@ +package crdt + +import ( + "sort" + "strings" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +const ( + versionsAddAttr = "S3-CRDT-Versions-Add" +) + +type ObjectVersions struct { + name string + objects []*ObjectVersion + addList []string + isSorted bool +} + +type ObjectVersion struct { + OjbID oid.ID + Headers map[string]string + CreationEpoch uint64 +} + +func (o *ObjectVersion) VersionID() string { + return o.OjbID.EncodeToString() +} + +func NewObjectVersions(name string) *ObjectVersions { + return &ObjectVersions{name: name} +} + +func NewObjectVersion(obj *object.Object) *ObjectVersion { + objID, _ := obj.ID() + headers := make(map[string]string) + + for _, attr := range obj.Attributes() { + headers[attr.Key()] = attr.Value() + } + + return &ObjectVersion{ + OjbID: objID, + Headers: headers, + CreationEpoch: obj.CreationEpoch(), + } +} + +func (v *ObjectVersions) Name() string { + return v.name +} + +func (v *ObjectVersions) AppendVersion(ov *ObjectVersion) { + addVers := append(splitVersions(ov.Headers[versionsAddAttr]), ov.VersionID()) + v.objects = append(v.objects, ov) + for _, add := range addVers { + if !contains(v.addList, add) { + v.addList = append(v.addList, add) + } + } + v.isSorted = false +} + +func (v *ObjectVersions) GetCRDTHeaders() map[string]string { + if len(v.objects) == 0 { + return nil + } + + headers := make(map[string]string, 2) + + if len(v.addList) != 0 { + headers[versionsAddAttr] = v.getAddHeader() + } + + return headers +} + +func (v *ObjectVersions) GetLast() *ObjectVersion { + if len(v.objects) == 0 { + return nil + } + + v.sort() + return v.objects[len(v.objects)-1] +} + +func splitVersions(header string) []string { + if len(header) == 0 { + return nil + } + + return strings.Split(header, ",") +} + +func (v *ObjectVersions) sort() { + if !v.isSorted { + sort.Slice(v.objects, func(i, j int) bool { + return less(v.objects[i], v.objects[j]) + }) + v.isSorted = true + } +} + +func (v *ObjectVersions) getAddHeader() string { + return strings.Join(v.addList, ",") +} + +func less(ov1, ov2 *ObjectVersion) bool { + if ov1.CreationEpoch == ov2.CreationEpoch { + return ov1.VersionID() < ov2.VersionID() + } + return ov1.CreationEpoch < ov2.CreationEpoch +} + +func contains(list []string, elem string) bool { + for _, item := range list { + if elem == item { + return true + } + } + return false +} diff --git a/internal/frostfs/crdt/gset_test.go b/internal/frostfs/crdt/gset_test.go new file mode 100644 index 0000000..a26589d --- /dev/null +++ b/internal/frostfs/crdt/gset_test.go @@ -0,0 +1,38 @@ +package crdt + +import ( + "testing" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestObjectVersionsSort(t *testing.T) { + t.Run("sort by epoch", func(t *testing.T) { + versions := NewObjectVersions("test") + versions.AppendVersion(&ObjectVersion{CreationEpoch: 3}) + versions.AppendVersion(&ObjectVersion{CreationEpoch: 1}) + versions.AppendVersion(&ObjectVersion{CreationEpoch: 2}) + + last := versions.GetLast() + require.Equal(t, uint64(3), last.CreationEpoch) + }) + + t.Run("sort by oids", func(t *testing.T) { + versions := NewObjectVersions("test") + versions.AppendVersion(&ObjectVersion{CreationEpoch: 3, OjbID: getTestOID(2)}) + versions.AppendVersion(&ObjectVersion{CreationEpoch: 3, OjbID: getTestOID(1)}) + versions.AppendVersion(&ObjectVersion{CreationEpoch: 1, OjbID: getTestOID(3)}) + versions.AppendVersion(&ObjectVersion{CreationEpoch: 2, OjbID: getTestOID(4)}) + + last := versions.GetLast() + require.Equal(t, uint64(3), last.CreationEpoch) + require.Equal(t, getTestOID(2).String(), last.VersionID()) + }) +} + +func getTestOID(val byte) oid.ID { + var res oid.ID + res.SetSHA256([32]byte{val}) + return res +}