forked from TrueCloudLab/frostfs-s3-gw
Nikita Zinkevich
2c002b657e
Fix imports in order to apply new sdk changes caused by merging with frostfs-api-go and reimplementing tree service client Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
569 lines
14 KiB
Go
569 lines
14 KiB
Go
package layer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
|
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
)
|
|
|
|
type FeatureSettingsMock struct {
|
|
clientCut bool
|
|
md5Enabled bool
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
|
return 1
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) TombstoneMembersSize() int {
|
|
return 2
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
|
return 0
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) ClientCut() bool {
|
|
return k.clientCut
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) SetClientCut(clientCut bool) {
|
|
k.clientCut = clientCut
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) MD5Enabled() bool {
|
|
return k.md5Enabled
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
|
|
k.md5Enabled = md5Enabled
|
|
}
|
|
|
|
func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
|
if ns == "" {
|
|
return v2container.SysAttributeZoneDefault
|
|
}
|
|
|
|
return ns + ".ns"
|
|
}
|
|
|
|
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
|
|
|
type TestFrostFS struct {
|
|
objects map[string]*object.Object
|
|
objectErrors map[string]error
|
|
objectPutErrors map[string]error
|
|
containers map[string]*container.Container
|
|
chains map[string][]chain.Chain
|
|
currentEpoch uint64
|
|
key *keys.PrivateKey
|
|
}
|
|
|
|
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
|
return &TestFrostFS{
|
|
objects: make(map[string]*object.Object),
|
|
objectErrors: make(map[string]error),
|
|
objectPutErrors: make(map[string]error),
|
|
containers: make(map[string]*container.Container),
|
|
chains: make(map[string][]chain.Chain),
|
|
key: key,
|
|
}
|
|
}
|
|
|
|
func (t *TestFrostFS) CurrentEpoch() uint64 {
|
|
return t.currentEpoch
|
|
}
|
|
|
|
func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) {
|
|
if err == nil {
|
|
delete(t.objectErrors, addr.EncodeToString())
|
|
} else {
|
|
t.objectErrors[addr.EncodeToString()] = err
|
|
}
|
|
}
|
|
|
|
func (t *TestFrostFS) SetObjectPutError(fileName string, err error) {
|
|
if err == nil {
|
|
delete(t.objectPutErrors, fileName)
|
|
} else {
|
|
t.objectPutErrors[fileName] = err
|
|
}
|
|
}
|
|
|
|
func (t *TestFrostFS) Objects() []*object.Object {
|
|
res := make([]*object.Object, 0, len(t.objects))
|
|
|
|
for _, obj := range t.objects {
|
|
res = append(res, obj)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
|
|
for _, obj := range t.objects {
|
|
if id, _ := obj.ID(); id.Equals(objID) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
|
|
t.objects[key] = obj
|
|
}
|
|
|
|
func (t *TestFrostFS) ContainerID(name string) (cid.ID, error) {
|
|
for id, cnr := range t.containers {
|
|
if container.Name(*cnr) == name {
|
|
var cnrID cid.ID
|
|
return cnrID, cnrID.DecodeString(id)
|
|
}
|
|
}
|
|
return cid.ID{}, fmt.Errorf("not found")
|
|
}
|
|
|
|
func (t *TestFrostFS) SetContainer(cnrID cid.ID, cnr *container.Container) {
|
|
t.containers[cnrID.EncodeToString()] = cnr
|
|
}
|
|
|
|
func (t *TestFrostFS) CreateContainer(_ context.Context, prm frostfs.PrmContainerCreate) (*frostfs.ContainerCreateResult, error) {
|
|
var cnr container.Container
|
|
cnr.Init()
|
|
cnr.SetOwner(prm.Creator)
|
|
cnr.SetPlacementPolicy(prm.Policy)
|
|
|
|
creationTime := prm.CreationTime
|
|
if creationTime.IsZero() {
|
|
creationTime = time.Now()
|
|
}
|
|
container.SetCreationTime(&cnr, creationTime)
|
|
|
|
if prm.Name != "" {
|
|
var d container.Domain
|
|
d.SetName(prm.Name)
|
|
d.SetZone(prm.Zone)
|
|
|
|
container.WriteDomain(&cnr, d)
|
|
container.SetName(&cnr, prm.Name)
|
|
}
|
|
|
|
for i := range prm.AdditionalAttributes {
|
|
cnr.SetAttribute(prm.AdditionalAttributes[i][0], prm.AdditionalAttributes[i][1])
|
|
}
|
|
|
|
b := make([]byte, 32)
|
|
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var id cid.ID
|
|
id.SetSHA256(sha256.Sum256(b))
|
|
t.containers[id.EncodeToString()] = &cnr
|
|
t.chains[id.EncodeToString()] = []chain.Chain{}
|
|
|
|
return &frostfs.ContainerCreateResult{ContainerID: id}, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) DeleteContainer(_ context.Context, cnrID cid.ID, _ *session.Container) error {
|
|
delete(t.containers, cnrID.EncodeToString())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TestFrostFS) Container(_ context.Context, prm frostfs.PrmContainer) (*container.Container, error) {
|
|
for k, v := range t.containers {
|
|
if k == prm.ContainerID.EncodeToString() {
|
|
return v, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("container not found %s", prm.ContainerID)
|
|
}
|
|
|
|
func (t *TestFrostFS) UserContainers(context.Context, frostfs.PrmUserContainers) ([]cid.ID, error) {
|
|
var res []cid.ID
|
|
for k := range t.containers {
|
|
var idCnr cid.ID
|
|
if err := idCnr.DecodeString(k); err != nil {
|
|
return nil, err
|
|
}
|
|
res = append(res, idCnr)
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) retrieveObject(ctx context.Context, cnrID cid.ID, objID oid.ID) (*object.Object, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(cnrID)
|
|
addr.SetObject(objID)
|
|
|
|
sAddr := addr.EncodeToString()
|
|
|
|
if err := t.objectErrors[sAddr]; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if obj, ok := t.objects[sAddr]; ok {
|
|
owner := getBearerOwner(ctx)
|
|
if !t.checkAccess(cnrID, owner) {
|
|
return nil, frostfs.ErrAccessDenied
|
|
}
|
|
|
|
return obj, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
|
}
|
|
|
|
func (t *TestFrostFS) HeadObject(ctx context.Context, prm frostfs.PrmObjectHead) (*object.Object, error) {
|
|
return t.retrieveObject(ctx, prm.Container, prm.Object)
|
|
}
|
|
|
|
func (t *TestFrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) (*frostfs.Object, error) {
|
|
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &frostfs.Object{
|
|
Header: *obj,
|
|
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
|
|
}, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) {
|
|
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
off := prm.PayloadRange[0]
|
|
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
|
|
|
|
return io.NopCloser(bytes.NewReader(payload)), nil
|
|
}
|
|
|
|
func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
|
if prm.Type == object.TypeTombstone {
|
|
return t.createTombstone(ctx, prm)
|
|
}
|
|
|
|
b := make([]byte, 32)
|
|
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
|
return nil, err
|
|
}
|
|
var id oid.ID
|
|
id.SetSHA256(sha256.Sum256(b))
|
|
|
|
attrs := make([]object.Attribute, 0)
|
|
|
|
if err := t.objectPutErrors[prm.Filepath]; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if prm.Filepath != "" {
|
|
a := object.NewAttribute()
|
|
a.SetKey(object.AttributeFilePath)
|
|
a.SetValue(prm.Filepath)
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
if prm.ClientCut {
|
|
a := object.NewAttribute()
|
|
a.SetKey("s3-client-cut")
|
|
a.SetValue("true")
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
for i := range prm.Attributes {
|
|
a := object.NewAttribute()
|
|
a.SetKey(prm.Attributes[i][0])
|
|
a.SetValue(prm.Attributes[i][1])
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
var owner user.ID
|
|
user.IDFromKey(&owner, t.key.PrivateKey.PublicKey)
|
|
|
|
obj := object.New()
|
|
obj.SetContainerID(prm.Container)
|
|
obj.SetID(id)
|
|
obj.SetPayloadSize(prm.PayloadSize)
|
|
obj.SetAttributes(attrs...)
|
|
obj.SetCreationEpoch(t.currentEpoch)
|
|
obj.SetOwnerID(owner)
|
|
t.currentEpoch++
|
|
|
|
if len(prm.Locks) > 0 {
|
|
lock := new(object.Lock)
|
|
lock.WriteMembers(prm.Locks)
|
|
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
|
|
}
|
|
|
|
if prm.Payload != nil {
|
|
all, err := io.ReadAll(prm.Payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
obj.SetPayload(all)
|
|
obj.SetPayloadSize(uint64(len(all)))
|
|
var hash checksum.Checksum
|
|
checksum.Calculate(&hash, checksum.SHA256, all)
|
|
obj.SetPayloadChecksum(hash)
|
|
}
|
|
|
|
cnrID, _ := obj.ContainerID()
|
|
objID, _ := obj.ID()
|
|
|
|
addr := newAddress(cnrID, objID)
|
|
t.objects[addr.EncodeToString()] = obj
|
|
return &frostfs.CreateObjectResult{
|
|
ObjectID: objID,
|
|
CreationEpoch: t.currentEpoch - 1,
|
|
}, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
|
payload, err := io.ReadAll(prm.Payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var tomb object.Tombstone
|
|
err = tomb.Unmarshal(payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, objID := range tomb.Members() {
|
|
prmDelete := frostfs.PrmObjectDelete{
|
|
PrmAuth: prm.PrmAuth,
|
|
Container: prm.Container,
|
|
Object: objID,
|
|
}
|
|
|
|
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &frostfs.CreateObjectResult{
|
|
CreationEpoch: t.currentEpoch,
|
|
}, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
if err := t.objectErrors[addr.EncodeToString()]; err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, ok := t.objects[addr.EncodeToString()]; ok {
|
|
owner := getBearerOwner(ctx)
|
|
if !t.checkAccess(prm.Container, owner) {
|
|
return frostfs.ErrAccessDenied
|
|
}
|
|
|
|
delete(t.objects, addr.EncodeToString())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TestFrostFS) TimeToEpoch(_ context.Context, now, futureTime time.Time) (uint64, uint64, error) {
|
|
return t.currentEpoch, t.currentEpoch + uint64(futureTime.Sub(now).Seconds()), nil
|
|
}
|
|
|
|
func (t *TestFrostFS) AllObjects(cnrID cid.ID) []oid.ID {
|
|
result := make([]oid.ID, 0, len(t.objects))
|
|
|
|
for _, val := range t.objects {
|
|
objCnrID, _ := val.ContainerID()
|
|
objObjID, _ := val.ID()
|
|
if cnrID.Equals(objCnrID) {
|
|
result = append(result, objObjID)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (t *TestFrostFS) SearchObjects(_ context.Context, prm frostfs.PrmObjectSearch) ([]oid.ID, error) {
|
|
filters := object.NewSearchFilters()
|
|
filters.AddRootFilter()
|
|
|
|
if prm.ExactAttribute[0] != "" {
|
|
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
|
|
}
|
|
|
|
cidStr := prm.Container.EncodeToString()
|
|
|
|
var res []oid.ID
|
|
|
|
if len(filters) == 1 {
|
|
for k, v := range t.objects {
|
|
if strings.Contains(k, cidStr) {
|
|
id, _ := v.ID()
|
|
res = append(res, id)
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
filter := filters[1]
|
|
if len(filters) != 2 || filter.Operation() != object.MatchStringEqual {
|
|
return nil, fmt.Errorf("usupported filters")
|
|
}
|
|
|
|
for k, v := range t.objects {
|
|
if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) {
|
|
id, _ := v.ID()
|
|
res = append(res, id)
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) NetworkInfo(context.Context) (netmap.NetworkInfo, error) {
|
|
ni := netmap.NetworkInfo{}
|
|
ni.SetCurrentEpoch(t.currentEpoch)
|
|
ni.SetEpochDuration(60)
|
|
ni.SetMsPerBlock(1000)
|
|
|
|
return ni, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (oid.ID, error) {
|
|
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
|
|
if err != nil {
|
|
return oid.ID{}, err
|
|
}
|
|
|
|
newObj := *obj
|
|
|
|
patchBytes, err := io.ReadAll(prm.Payload)
|
|
if err != nil {
|
|
return oid.ID{}, err
|
|
}
|
|
|
|
var newPayload []byte
|
|
if prm.Offset > 0 {
|
|
newPayload = append(newPayload, obj.Payload()[:prm.Offset]...)
|
|
}
|
|
newPayload = append(newPayload, patchBytes...)
|
|
if prm.Offset+prm.Length < obj.PayloadSize() {
|
|
newPayload = append(newPayload, obj.Payload()[prm.Offset+prm.Length:]...)
|
|
}
|
|
newObj.SetPayload(newPayload)
|
|
newObj.SetPayloadSize(uint64(len(newPayload)))
|
|
|
|
var hash checksum.Checksum
|
|
checksum.Calculate(&hash, checksum.SHA256, newPayload)
|
|
newObj.SetPayloadChecksum(hash)
|
|
|
|
newID := oidtest.ID()
|
|
newObj.SetID(newID)
|
|
|
|
t.objects[newAddress(prm.Container, newID).EncodeToString()] = &newObj
|
|
|
|
return newID, nil
|
|
}
|
|
|
|
func (t *TestFrostFS) Relations() relations.Relations {
|
|
return &RelationsMock{}
|
|
}
|
|
|
|
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
|
list, ok := t.chains[prm.ContainerID.EncodeToString()]
|
|
if !ok {
|
|
return errors.New("container not found")
|
|
}
|
|
|
|
t.chains[prm.ContainerID.EncodeToString()] = append(list, prm.Chain)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool {
|
|
cnr, ok := t.containers[cnrID.EncodeToString()]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
if !cnr.BasicACL().Extendable() {
|
|
return cnr.Owner().Equals(owner)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func getBearerOwner(ctx context.Context) user.ID {
|
|
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
|
return bearer.ResolveIssuer(*bd.Gate.BearerToken)
|
|
}
|
|
|
|
return user.ID{}
|
|
}
|
|
|
|
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
|
for _, attr := range attributes {
|
|
if attr.Key() == filter.Header() && attr.Value() == filter.Value() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type RelationsMock struct{}
|
|
|
|
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
|
|
return nil, relations.ErrNoSplitInfo
|
|
}
|
|
|
|
func (r *RelationsMock) ListChildrenByLinker(context.Context, cid.ID, oid.ID, relations.Tokens) ([]oid.ID, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (r *RelationsMock) GetLeftSibling(context.Context, cid.ID, oid.ID, relations.Tokens) (oid.ID, error) {
|
|
return oid.ID{}, nil
|
|
}
|
|
|
|
func (r *RelationsMock) FindSiblingBySplitID(context.Context, cid.ID, *object.SplitID, relations.Tokens) ([]oid.ID, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (r *RelationsMock) FindSiblingByParentID(_ context.Context, _ cid.ID, _ oid.ID, _ relations.Tokens) ([]oid.ID, error) {
|
|
return nil, nil
|
|
}
|