package frostfs import ( "context" "errors" "fmt" "io" "math" "strconv" "time" objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" frosterr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" ) // FrostFS represents virtual connection to the FrostFS network. // It is used to provide an interface to dependent packages // which work with FrostFS. type FrostFS struct { pool *pool.Pool await pool.WaitParams owner user.ID } const ( defaultPollInterval = time.Second // overrides default value from pool defaultPollTimeout = 120 * time.Second // same as default value from pool ) // NewFrostFS creates new FrostFS using provided pool.Pool. func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS { await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout} var owner user.ID user.IDFromKey(&owner, key.PrivateKey.PublicKey) return &FrostFS{ pool: p, await: await, owner: owner, } } // TimeToEpoch implements layer.FrostFS interface method. func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) { dur := futureTime.Sub(now) if dur < 0 { return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)", futureTime.Format(time.RFC3339), now.Format(time.RFC3339)) } networkInfo, err := x.pool.NetworkInfo(ctx) if err != nil { return 0, 0, handleObjectError("get network info via client", err) } durEpoch := networkInfo.EpochDuration() if durEpoch == 0 { return 0, 0, errors.New("epoch duration is missing or zero") } curr := networkInfo.CurrentEpoch() msPerEpoch := durEpoch * uint64(networkInfo.MsPerBlock()) epochLifetime := uint64(dur.Milliseconds()) / msPerEpoch if uint64(dur.Milliseconds())%msPerEpoch != 0 { epochLifetime++ } var epoch uint64 if epochLifetime >= math.MaxUint64-curr { epoch = math.MaxUint64 } else { epoch = curr + epochLifetime } return curr, epoch, nil } // Container implements layer.FrostFS interface method. func (x *FrostFS) Container(ctx context.Context, layerPrm frostfs.PrmContainer) (*container.Container, error) { prm := pool.PrmContainerGet{ ContainerID: layerPrm.ContainerID, Session: layerPrm.SessionToken, } res, err := x.pool.GetContainer(ctx, prm) if err != nil { return nil, handleObjectError("read container via connection pool", err) } return &res, nil } // CreateContainer implements layer.FrostFS interface method. func (x *FrostFS) CreateContainer(ctx context.Context, prm frostfs.PrmContainerCreate) (*frostfs.ContainerCreateResult, error) { var cnr container.Container cnr.Init() cnr.SetPlacementPolicy(prm.Policy) cnr.SetOwner(prm.Creator) cnr.SetBasicACL(prm.BasicACL) creationTime := prm.CreationTime if creationTime.IsZero() { creationTime = time.Now() } container.SetCreationTime(&cnr, creationTime) if prm.Name != "" { var d container.Domain d.SetName(prm.Name) d.SetZone(prm.Zone) container.WriteDomain(&cnr, d) container.SetName(&cnr, prm.Name) } for i := range prm.AdditionalAttributes { cnr.SetAttribute(prm.AdditionalAttributes[i][0], prm.AdditionalAttributes[i][1]) } err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool) if err != nil { return nil, handleObjectError("sync container with the network state", err) } prmPut := pool.PrmContainerPut{ ClientParams: client.PrmContainerPut{ Container: &cnr, Session: prm.SessionToken, }, WaitParams: &x.await, } // send request to save the container idCnr, err := x.pool.PutContainer(ctx, prmPut) return &frostfs.ContainerCreateResult{ ContainerID: idCnr, HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr), }, handleObjectError("save container via connection pool", err) } // UserContainers implements layer.FrostFS interface method. func (x *FrostFS) UserContainers(ctx context.Context, layerPrm frostfs.PrmUserContainers) ([]cid.ID, error) { prm := pool.PrmContainerList{ OwnerID: layerPrm.UserID, Session: layerPrm.SessionToken, } r, err := x.pool.ListContainers(ctx, prm) return r, handleObjectError("list user containers via connection pool", err) } // DeleteContainer implements layer.FrostFS interface method. func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error { prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await} err := x.pool.DeleteContainer(ctx, prm) return handleObjectError("delete container via connection pool", err) } // CreateObject implements layer.FrostFS interface method. func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) { attrNum := len(prm.Attributes) + 1 // + creation time if prm.Filepath != "" { attrNum++ } attrs := make([]object.Attribute, 0, attrNum) var a *object.Attribute a = object.NewAttribute() a.SetKey(object.AttributeTimestamp) creationTime := prm.CreationTime if creationTime.IsZero() { creationTime = time.Now() } a.SetValue(strconv.FormatInt(creationTime.Unix(), 10)) attrs = append(attrs, *a) for i := range prm.Attributes { a = object.NewAttribute() a.SetKey(prm.Attributes[i][0]) a.SetValue(prm.Attributes[i][1]) attrs = append(attrs, *a) } if prm.Filepath != "" { a = object.NewAttribute() a.SetKey(object.AttributeFilePath) a.SetValue(prm.Filepath) attrs = append(attrs, *a) } obj := object.New() obj.SetContainerID(prm.Container) obj.SetOwnerID(x.owner) obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) if prm.BearerToken == nil && prm.PrivateKey != nil { var owner user.ID user.IDFromKey(&owner, prm.PrivateKey.PublicKey) obj.SetOwnerID(owner) } if len(prm.Locks) > 0 { lock := new(object.Lock) lock.WriteMembers(prm.Locks) objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock)) } var prmPut pool.PrmObjectPut prmPut.SetHeader(*obj) prmPut.SetPayload(prm.Payload) prmPut.SetCopiesNumberVector(prm.CopiesNumber) prmPut.SetClientCut(prm.ClientCut) prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash) prmPut.SetBufferMaxSize(prm.BufferMaxSize) if prm.BearerToken != nil { prmPut.UseBearer(*prm.BearerToken) } else { prmPut.UseKey(prm.PrivateKey) } res, err := x.pool.PutObject(ctx, prmPut) if err = handleObjectError("save object via connection pool", err); err != nil { return nil, err } return &frostfs.CreateObjectResult{ ObjectID: res.ObjectID, CreationEpoch: res.Epoch, }, nil } // wraps io.ReadCloser and transforms Read errors related to access violation // to frostfs.ErrAccessDenied. type payloadReader struct { io.ReadCloser } func (x payloadReader) Read(p []byte) (int, error) { n, err := x.ReadCloser.Read(p) if err != nil && errors.Is(err, io.EOF) { return n, err } return n, handleObjectError("read payload", err) } // HeadObject implements layer.FrostFS interface method. func (x *FrostFS) HeadObject(ctx context.Context, prm frostfs.PrmObjectHead) (*object.Object, error) { var addr oid.Address addr.SetContainer(prm.Container) addr.SetObject(prm.Object) var prmHead pool.PrmObjectHead prmHead.SetAddress(addr) if prm.BearerToken != nil { prmHead.UseBearer(*prm.BearerToken) } else { prmHead.UseKey(prm.PrivateKey) } res, err := x.pool.HeadObject(ctx, prmHead) if err != nil { return nil, handleObjectError("read object header via connection pool", err) } return &res, nil } // GetObject implements layer.FrostFS interface method. func (x *FrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) (*frostfs.Object, error) { var addr oid.Address addr.SetContainer(prm.Container) addr.SetObject(prm.Object) var prmGet pool.PrmObjectGet prmGet.SetAddress(addr) if prm.BearerToken != nil { prmGet.UseBearer(*prm.BearerToken) } else { prmGet.UseKey(prm.PrivateKey) } res, err := x.pool.GetObject(ctx, prmGet) if err != nil { return nil, handleObjectError("init full object reading via connection pool", err) } return &frostfs.Object{ Header: res.Header, Payload: res.Payload, }, nil } // RangeObject implements layer.FrostFS interface method. func (x *FrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) { var addr oid.Address addr.SetContainer(prm.Container) addr.SetObject(prm.Object) var prmRange pool.PrmObjectRange prmRange.SetAddress(addr) prmRange.SetOffset(prm.PayloadRange[0]) prmRange.SetLength(prm.PayloadRange[1]) if prm.BearerToken != nil { prmRange.UseBearer(*prm.BearerToken) } else { prmRange.UseKey(prm.PrivateKey) } res, err := x.pool.ObjectRange(ctx, prmRange) if err != nil { return nil, handleObjectError("init payload range reading via connection pool", err) } return payloadReader{&res}, nil } // DeleteObject implements layer.FrostFS interface method. func (x *FrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error { var addr oid.Address addr.SetContainer(prm.Container) addr.SetObject(prm.Object) var prmDelete pool.PrmObjectDelete prmDelete.SetAddress(addr) if prm.BearerToken != nil { prmDelete.UseBearer(*prm.BearerToken) } else { prmDelete.UseKey(prm.PrivateKey) } err := x.pool.DeleteObject(ctx, prmDelete) return handleObjectError("mark object removal via connection pool", err) } // SearchObjects implements layer.FrostFS interface method. func (x *FrostFS) SearchObjects(ctx context.Context, prm frostfs.PrmObjectSearch) ([]oid.ID, error) { filters := object.NewSearchFilters() filters.AddRootFilter() if prm.ExactAttribute[0] != "" { filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual) } if prm.FilePrefix != "" { filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix) } var prmSearch pool.PrmObjectSearch prmSearch.SetContainerID(prm.Container) prmSearch.SetFilters(filters) if prm.BearerToken != nil { prmSearch.UseBearer(*prm.BearerToken) } else { prmSearch.UseKey(prm.PrivateKey) } res, err := x.pool.SearchObjects(ctx, prmSearch) if err != nil { return nil, handleObjectError("init object search via connection pool", err) } defer res.Close() var buf []oid.ID err = res.Iterate(func(id oid.ID) bool { buf = append(buf, id) return false }) return buf, handleObjectError("read object list", err) } // NetworkInfo implements layer.FrostFS interface method. func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { ni, err := x.pool.NetworkInfo(ctx) if err != nil { return ni, handleObjectError("get network info via connection pool", err) } return ni, nil } func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (oid.ID, error) { var addr oid.Address addr.SetContainer(prm.Container) addr.SetObject(prm.Object) var prmPatch pool.PrmObjectPatch prmPatch.SetAddress(addr) var rng object.Range rng.SetOffset(prm.Offset) rng.SetLength(prm.Length) if prm.Length+prm.Offset > prm.ObjectSize { rng.SetLength(prm.ObjectSize - prm.Offset) } prmPatch.SetRange(&rng) prmPatch.SetPayloadReader(prm.Payload) if prm.BearerToken != nil { prmPatch.UseBearer(*prm.BearerToken) } else { prmPatch.UseKey(prm.PrivateKey) } res, err := x.pool.PatchObject(ctx, prmPatch) if err != nil { return oid.ID{}, handleObjectError("patch object via connection pool", err) } return res.ObjectID, nil } // ResolverFrostFS represents virtual connection to the FrostFS network. // It implements resolver.FrostFS. type ResolverFrostFS struct { pool *pool.Pool } // NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool. func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS { return &ResolverFrostFS{pool: p} } // SystemDNS implements resolver.FrostFS interface method. func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) { networkInfo, err := x.pool.NetworkInfo(ctx) if err != nil { return "", handleObjectError("read network info via client", err) } domain := networkInfo.RawNetworkParameter("SystemDNS") if domain == nil { return "", errors.New("system DNS parameter not found or empty") } return string(domain), nil } func handleObjectError(msg string, err error) error { if err == nil { return nil } if reason, ok := frosterr.IsErrObjectAccessDenied(err); ok { return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrAccessDenied, reason) } if frosterr.IsTimeoutError(err) { return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGatewayTimeout, err.Error()) } return fmt.Errorf("%s: %w", msg, err) } // PoolStatistic is a mediator which implements authmate.FrostFS through pool.Pool. type PoolStatistic struct { pool *pool.Pool } // NewPoolStatistic creates new PoolStatistic using provided pool.Pool. func NewPoolStatistic(p *pool.Pool) *PoolStatistic { return &PoolStatistic{pool: p} } // Statistic implements interface method. func (x *PoolStatistic) Statistic() pool.Statistic { return x.pool.Statistic() }