package frostfs import ( "context" "errors" "fmt" "io" "strings" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // FrostFS represents virtual connection to the FrostFS network. // It is used to provide an interface to dependent packages // which work with FrostFS. type FrostFS struct { pool *pool.Pool } // NewFrostFS creates new FrostFS using provided pool.Pool. func NewFrostFS(p *pool.Pool) *FrostFS { return &FrostFS{ pool: p, } } // Container implements frostfs.FrostFS interface method. func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContainer) (*container.Container, error) { prm := pool.PrmContainerGet{ ContainerID: containerPrm.ContainerID, } res, err := x.pool.GetContainer(ctx, prm) if err != nil { return nil, handleObjectError("read container via connection pool", err) } return &res, nil } // CreateObject implements frostfs.FrostFS interface method. func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate) (oid.ID, error) { var prmPut pool.PrmObjectPut prmPut.SetHeader(*prm.Object) prmPut.SetPayload(prm.Payload) prmPut.SetClientCut(prm.ClientCut) prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash) prmPut.SetBufferMaxSize(prm.BufferMaxSize) if prm.BearerToken != nil { prmPut.UseBearer(*prm.BearerToken) } idObj, err := x.pool.PutObject(ctx, prmPut) if err != nil { return oid.ID{}, handleObjectError("save object via connection pool", err) } return idObj.ObjectID, nil } // wraps io.ReadCloser and transforms Read errors related to access violation // to frostfs.ErrAccessDenied. type payloadReader struct { io.ReadCloser } func (x payloadReader) Read(p []byte) (int, error) { n, err := x.ReadCloser.Read(p) if err != nil && errors.Is(err, io.EOF) { return n, err } return n, handleObjectError("read payload", err) } // HeadObject implements frostfs.FrostFS interface method. func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*object.Object, error) { var prmHead pool.PrmObjectHead prmHead.SetAddress(prm.Address) if prm.BearerToken != nil { prmHead.UseBearer(*prm.BearerToken) } res, err := x.pool.HeadObject(ctx, prmHead) if err != nil { return nil, handleObjectError("read object header via connection pool", err) } return &res, nil } // GetObject implements frostfs.FrostFS interface method. func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*handler.Object, error) { var prmGet pool.PrmObjectGet prmGet.SetAddress(prm.Address) if prm.BearerToken != nil { prmGet.UseBearer(*prm.BearerToken) } res, err := x.pool.GetObject(ctx, prmGet) if err != nil { return nil, handleObjectError("init full object reading via connection pool", err) } return &handler.Object{ Header: res.Header, Payload: res.Payload, }, nil } // RangeObject implements frostfs.FrostFS interface method. func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (io.ReadCloser, error) { var prmRange pool.PrmObjectRange prmRange.SetAddress(prm.Address) prmRange.SetOffset(prm.PayloadRange[0]) prmRange.SetLength(prm.PayloadRange[1]) if prm.BearerToken != nil { prmRange.UseBearer(*prm.BearerToken) } res, err := x.pool.ObjectRange(ctx, prmRange) if err != nil { return nil, handleObjectError("init payload range reading via connection pool", err) } return payloadReader{&res}, nil } // SearchObjects implements frostfs.FrostFS interface method. func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch) (handler.ResObjectSearch, error) { var prmSearch pool.PrmObjectSearch prmSearch.SetContainerID(prm.Container) prmSearch.SetFilters(prm.Filters) if prm.BearerToken != nil { prmSearch.UseBearer(*prm.BearerToken) } res, err := x.pool.SearchObjects(ctx, prmSearch) if err != nil { return nil, handleObjectError("init object search via connection pool", err) } return &res, nil } // GetEpochDurations implements frostfs.FrostFS interface method. func (x *FrostFS) GetEpochDurations(ctx context.Context) (*utils.EpochDurations, error) { networkInfo, err := x.pool.NetworkInfo(ctx) if err != nil { return nil, err } res := &utils.EpochDurations{ CurrentEpoch: networkInfo.CurrentEpoch(), MsPerBlock: networkInfo.MsPerBlock(), BlockPerEpoch: networkInfo.EpochDuration(), } if res.BlockPerEpoch == 0 { return nil, fmt.Errorf("EpochDuration is empty") } return res, nil } func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) { netmapSnapshot, err := x.pool.NetMapSnapshot(ctx) if err != nil { return netmapSnapshot, handleObjectError("get netmap via connection pool", err) } return netmapSnapshot, nil } // ResolverFrostFS represents virtual connection to the FrostFS network. // It implements resolver.FrostFS. type ResolverFrostFS struct { pool *pool.Pool } // NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool. func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS { return &ResolverFrostFS{pool: p} } // SystemDNS implements resolver.FrostFS interface method. func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) { networkInfo, err := x.pool.NetworkInfo(ctx) if err != nil { return "", handleObjectError("read network info via client", err) } domain := networkInfo.RawNetworkParameter("SystemDNS") if domain == nil { return "", errors.New("system DNS parameter not found or empty") } return string(domain), nil } func handleObjectError(msg string, err error) error { if err == nil { return nil } if reason, ok := IsErrObjectAccessDenied(err); ok { return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason) } if IsTimeoutError(err) { return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error()) } return fmt.Errorf("%s: %w", msg, err) } func UnwrapErr(err error) error { unwrappedErr := errors.Unwrap(err) for unwrappedErr != nil { err = unwrappedErr unwrappedErr = errors.Unwrap(err) } return err } func IsErrObjectAccessDenied(err error) (string, bool) { err = UnwrapErr(err) switch err := err.(type) { default: return "", false case *apistatus.ObjectAccessDenied: return err.Reason(), true } } func IsTimeoutError(err error) bool { if strings.Contains(err.Error(), "timeout") || errors.Is(err, context.DeadlineExceeded) { return true } return status.Code(UnwrapErr(err)) == codes.DeadlineExceeded }