forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
15102e6dfd
`slices.SortFunc` doesn't use reflection and is a bit faster. I have done some micro-benchmarks for `[]NodeInfo`: ``` $ benchstat -col "/func" out goos: linux goarch: amd64 pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz │ sort.Slice │ slices.SortFunc │ │ sec/op │ sec/op vs base │ Sort-8 2.130µ ± 2% 1.253µ ± 2% -41.20% (p=0.000 n=10) ``` Haven't included them, though, as they I don't see them being used a lot. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
900 lines
23 KiB
Go
900 lines
23 KiB
Go
package internal
|
|
|
|
import (
|
|
"bytes"
|
|
"cmp"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"slices"
|
|
"sort"
|
|
"strings"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
|
)
|
|
|
|
var errMissingHeaderInResponse = errors.New("missing header in response")
|
|
|
|
// BalanceOfPrm groups parameters of BalanceOf operation.
|
|
type BalanceOfPrm struct {
|
|
commonPrm
|
|
client.PrmBalanceGet
|
|
}
|
|
|
|
// BalanceOfRes groups the resulting values of BalanceOf operation.
|
|
type BalanceOfRes struct {
|
|
cliRes *client.ResBalanceGet
|
|
}
|
|
|
|
// Balance returns the current balance.
|
|
func (x BalanceOfRes) Balance() accounting.Decimal {
|
|
return x.cliRes.Amount()
|
|
}
|
|
|
|
// BalanceOf requests the current balance of a FrostFS user.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func BalanceOf(ctx context.Context, prm BalanceOfPrm) (res BalanceOfRes, err error) {
|
|
res.cliRes, err = prm.cli.BalanceGet(ctx, prm.PrmBalanceGet)
|
|
|
|
return
|
|
}
|
|
|
|
// ListContainersPrm groups parameters of ListContainers operation.
|
|
type ListContainersPrm struct {
|
|
commonPrm
|
|
client.PrmContainerList
|
|
}
|
|
|
|
// ListContainersRes groups the resulting values of ListContainers operation.
|
|
type ListContainersRes struct {
|
|
cliRes *client.ResContainerList
|
|
}
|
|
|
|
// IDList returns list of identifiers of user's containers.
|
|
func (x ListContainersRes) IDList() []cid.ID {
|
|
return x.cliRes.Containers()
|
|
}
|
|
|
|
// ListContainers requests a list of FrostFS user's containers.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContainersRes, err error) {
|
|
res.cliRes, err = prm.cli.ContainerList(ctx, prm.PrmContainerList)
|
|
|
|
return
|
|
}
|
|
|
|
// SortedIDList returns sorted list of identifiers of user's containers.
|
|
func (x ListContainersRes) SortedIDList() []cid.ID {
|
|
list := x.cliRes.Containers()
|
|
sort.Slice(list, func(i, j int) bool {
|
|
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
|
return strings.Compare(lhs, rhs) < 0
|
|
})
|
|
return list
|
|
}
|
|
|
|
// PutContainerPrm groups parameters of PutContainer operation.
|
|
type PutContainerPrm struct {
|
|
Client *client.Client
|
|
ClientParams client.PrmContainerPut
|
|
}
|
|
|
|
// PutContainerRes groups the resulting values of PutContainer operation.
|
|
type PutContainerRes struct {
|
|
cnr cid.ID
|
|
}
|
|
|
|
// ID returns identifier of the created container.
|
|
func (x PutContainerRes) ID() cid.ID {
|
|
return x.cnr
|
|
}
|
|
|
|
// PutContainer sends a request to save the container in FrostFS.
|
|
//
|
|
// Operation is asynchronous and not guaranteed even in the absence of errors.
|
|
// The required time is also not predictable.
|
|
//
|
|
// Success can be verified by reading by identifier.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func PutContainer(ctx context.Context, prm PutContainerPrm) (res PutContainerRes, err error) {
|
|
cliRes, err := prm.Client.ContainerPut(ctx, prm.ClientParams)
|
|
if err == nil {
|
|
res.cnr = cliRes.ID()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetContainerPrm groups parameters of GetContainer operation.
|
|
type GetContainerPrm struct {
|
|
Client *client.Client
|
|
ClientParams client.PrmContainerGet
|
|
}
|
|
|
|
// SetContainer sets identifier of the container to be read.
|
|
//
|
|
// Deprecated: Use GetContainerPrm.ClientParams.ContainerID instead.
|
|
func (x *GetContainerPrm) SetContainer(id cid.ID) {
|
|
x.ClientParams.ContainerID = &id
|
|
}
|
|
|
|
// GetContainerRes groups the resulting values of GetContainer operation.
|
|
type GetContainerRes struct {
|
|
cliRes *client.ResContainerGet
|
|
}
|
|
|
|
// Container returns structured of the requested container.
|
|
func (x GetContainerRes) Container() containerSDK.Container {
|
|
return x.cliRes.Container()
|
|
}
|
|
|
|
// GetContainer reads a container from FrostFS by ID.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func GetContainer(ctx context.Context, prm GetContainerPrm) (res GetContainerRes, err error) {
|
|
res.cliRes, err = prm.Client.ContainerGet(ctx, prm.ClientParams)
|
|
|
|
return
|
|
}
|
|
|
|
// IsACLExtendable checks if ACL of the container referenced by the given identifier
|
|
// can be extended. Client connection MUST BE correctly established in advance.
|
|
func IsACLExtendable(ctx context.Context, c *client.Client, cnr cid.ID) (bool, error) {
|
|
prm := GetContainerPrm{
|
|
Client: c,
|
|
ClientParams: client.PrmContainerGet{
|
|
ContainerID: &cnr,
|
|
},
|
|
}
|
|
|
|
res, err := GetContainer(ctx, prm)
|
|
if err != nil {
|
|
return false, fmt.Errorf("get container from the FrostFS: %w", err)
|
|
}
|
|
|
|
return res.Container().BasicACL().Extendable(), nil
|
|
}
|
|
|
|
// DeleteContainerPrm groups parameters of DeleteContainerPrm operation.
|
|
type DeleteContainerPrm struct {
|
|
Client *client.Client
|
|
ClientParams client.PrmContainerDelete
|
|
}
|
|
|
|
// DeleteContainerRes groups the resulting values of DeleteContainer operation.
|
|
type DeleteContainerRes struct{}
|
|
|
|
// DeleteContainer sends a request to remove a container from FrostFS by ID.
|
|
//
|
|
// Operation is asynchronous and not guaranteed even in the absence of errors.
|
|
// The required time is also not predictable.
|
|
//
|
|
// Success can be verified by reading by identifier.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func DeleteContainer(ctx context.Context, prm DeleteContainerPrm) (res DeleteContainerRes, err error) {
|
|
_, err = prm.Client.ContainerDelete(ctx, prm.ClientParams)
|
|
|
|
return
|
|
}
|
|
|
|
// NetworkInfoPrm groups parameters of NetworkInfo operation.
|
|
type NetworkInfoPrm struct {
|
|
Client *client.Client
|
|
ClientParams client.PrmNetworkInfo
|
|
}
|
|
|
|
// NetworkInfoRes groups the resulting values of NetworkInfo operation.
|
|
type NetworkInfoRes struct {
|
|
cliRes *client.ResNetworkInfo
|
|
}
|
|
|
|
// NetworkInfo returns structured information about the FrostFS network.
|
|
func (x NetworkInfoRes) NetworkInfo() netmap.NetworkInfo {
|
|
return x.cliRes.Info()
|
|
}
|
|
|
|
// NetworkInfo reads information about the FrostFS network.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func NetworkInfo(ctx context.Context, prm NetworkInfoPrm) (res NetworkInfoRes, err error) {
|
|
res.cliRes, err = prm.Client.NetworkInfo(ctx, prm.ClientParams)
|
|
|
|
return
|
|
}
|
|
|
|
// NodeInfoPrm groups parameters of NodeInfo operation.
|
|
type NodeInfoPrm struct {
|
|
Client *client.Client
|
|
ClientParams client.PrmEndpointInfo
|
|
}
|
|
|
|
// NodeInfoRes groups the resulting values of NodeInfo operation.
|
|
type NodeInfoRes struct {
|
|
cliRes *client.ResEndpointInfo
|
|
}
|
|
|
|
// NodeInfo returns information about the node from netmap.
|
|
func (x NodeInfoRes) NodeInfo() netmap.NodeInfo {
|
|
return x.cliRes.NodeInfo()
|
|
}
|
|
|
|
// LatestVersion returns the latest FrostFS API version in use.
|
|
func (x NodeInfoRes) LatestVersion() version.Version {
|
|
return x.cliRes.LatestVersion()
|
|
}
|
|
|
|
// NodeInfo requests information about the remote server from FrostFS netmap.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func NodeInfo(ctx context.Context, prm NodeInfoPrm) (res NodeInfoRes, err error) {
|
|
res.cliRes, err = prm.Client.EndpointInfo(ctx, prm.ClientParams)
|
|
|
|
return
|
|
}
|
|
|
|
// NetMapSnapshotPrm groups parameters of NetMapSnapshot operation.
|
|
type NetMapSnapshotPrm struct {
|
|
commonPrm
|
|
}
|
|
|
|
// NetMapSnapshotRes groups the resulting values of NetMapSnapshot operation.
|
|
type NetMapSnapshotRes struct {
|
|
cliRes *client.ResNetMapSnapshot
|
|
}
|
|
|
|
// NetMap returns current local snapshot of the FrostFS network map.
|
|
func (x NetMapSnapshotRes) NetMap() netmap.NetMap {
|
|
return x.cliRes.NetMap()
|
|
}
|
|
|
|
// NetMapSnapshot requests current network view of the remote server.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func NetMapSnapshot(ctx context.Context, prm NetMapSnapshotPrm) (res NetMapSnapshotRes, err error) {
|
|
res.cliRes, err = prm.cli.NetMapSnapshot(ctx, client.PrmNetMapSnapshot{})
|
|
return
|
|
}
|
|
|
|
// CreateSessionPrm groups parameters of CreateSession operation.
|
|
type CreateSessionPrm struct {
|
|
commonPrm
|
|
client.PrmSessionCreate
|
|
}
|
|
|
|
// CreateSessionRes groups the resulting values of CreateSession operation.
|
|
type CreateSessionRes struct {
|
|
cliRes *client.ResSessionCreate
|
|
}
|
|
|
|
// ID returns session identifier.
|
|
func (x CreateSessionRes) ID() []byte {
|
|
return x.cliRes.ID()
|
|
}
|
|
|
|
// SessionKey returns public session key in a binary format.
|
|
func (x CreateSessionRes) SessionKey() []byte {
|
|
return x.cliRes.PublicKey()
|
|
}
|
|
|
|
// CreateSession opens a new unlimited session with the remote node.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func CreateSession(ctx context.Context, prm CreateSessionPrm) (res CreateSessionRes, err error) {
|
|
res.cliRes, err = prm.cli.SessionCreate(ctx, prm.PrmSessionCreate)
|
|
|
|
return
|
|
}
|
|
|
|
// PutObjectPrm groups parameters of PutObject operation.
|
|
type PutObjectPrm struct {
|
|
commonObjectPrm
|
|
|
|
copyNum []uint32
|
|
|
|
hdr *objectSDK.Object
|
|
|
|
rdr io.Reader
|
|
|
|
headerCallback func()
|
|
|
|
prepareLocally bool
|
|
}
|
|
|
|
// SetHeader sets object header.
|
|
func (x *PutObjectPrm) SetHeader(hdr *objectSDK.Object) {
|
|
x.hdr = hdr
|
|
}
|
|
|
|
// SetPayloadReader sets reader of the object payload.
|
|
func (x *PutObjectPrm) SetPayloadReader(rdr io.Reader) {
|
|
x.rdr = rdr
|
|
}
|
|
|
|
// SetHeaderCallback sets callback which is called on the object after the header is received
|
|
// but before the payload is written.
|
|
func (x *PutObjectPrm) SetHeaderCallback(f func()) {
|
|
x.headerCallback = f
|
|
}
|
|
|
|
// SetCopiesNumberByVectors sets ordered list of minimal required object copies numbers
|
|
// per placement vector.
|
|
func (x *PutObjectPrm) SetCopiesNumberByVectors(copiesNumbers []uint32) {
|
|
x.copyNum = copiesNumbers
|
|
}
|
|
|
|
// PrepareLocally generate object header on the client side.
|
|
// For big object - split locally too.
|
|
func (x *PutObjectPrm) PrepareLocally() {
|
|
x.prepareLocally = true
|
|
}
|
|
|
|
func (x *PutObjectPrm) convertToSDKPrm(ctx context.Context) (client.PrmObjectPutInit, error) {
|
|
putPrm := client.PrmObjectPutInit{
|
|
XHeaders: x.xHeaders,
|
|
BearerToken: x.bearerToken,
|
|
Local: x.local,
|
|
CopiesNumber: x.copyNum,
|
|
}
|
|
|
|
if x.prepareLocally {
|
|
res, err := x.cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
|
|
if err != nil {
|
|
return client.PrmObjectPutInit{}, err
|
|
}
|
|
putPrm.MaxSize = res.Info().MaxObjectSize()
|
|
putPrm.EpochSource = epochSource(res.Info().CurrentEpoch())
|
|
putPrm.WithoutHomomorphHash = res.Info().HomomorphicHashingDisabled()
|
|
} else {
|
|
putPrm.Session = x.sessionToken
|
|
}
|
|
return putPrm, nil
|
|
}
|
|
|
|
// PutObjectRes groups the resulting values of PutObject operation.
|
|
type PutObjectRes struct {
|
|
id oid.ID
|
|
}
|
|
|
|
// ID returns identifier of the created object.
|
|
func (x PutObjectRes) ID() oid.ID {
|
|
return x.id
|
|
}
|
|
|
|
type epochSource uint64
|
|
|
|
func (s epochSource) CurrentEpoch() uint64 {
|
|
return uint64(s)
|
|
}
|
|
|
|
// PutObject saves the object in FrostFS network.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
|
sdkPrm, err := prm.convertToSDKPrm(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create parameters of object put operation: %w", err)
|
|
}
|
|
wrt, err := prm.cli.ObjectPutInit(ctx, sdkPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init object writing: %w", err)
|
|
}
|
|
|
|
if wrt.WriteHeader(ctx, *prm.hdr) {
|
|
if prm.headerCallback != nil {
|
|
prm.headerCallback()
|
|
}
|
|
|
|
sz := prm.hdr.PayloadSize()
|
|
|
|
if data := prm.hdr.Payload(); len(data) > 0 {
|
|
if prm.rdr != nil {
|
|
prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr)
|
|
} else {
|
|
prm.rdr = bytes.NewReader(data)
|
|
sz = uint64(len(data))
|
|
}
|
|
}
|
|
|
|
if prm.rdr != nil {
|
|
const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK.
|
|
|
|
if sz == 0 || sz > defaultBufferSizePut {
|
|
sz = defaultBufferSizePut
|
|
}
|
|
|
|
buf := make([]byte, sz)
|
|
|
|
var n int
|
|
|
|
for {
|
|
n, err = prm.rdr.Read(buf)
|
|
if n > 0 {
|
|
if !wrt.WritePayloadChunk(ctx, buf[:n]) {
|
|
break
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
|
|
return nil, fmt.Errorf("read payload: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
cliRes, err := wrt.Close(ctx)
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, fmt.Errorf("client failure: %w", err)
|
|
}
|
|
|
|
return &PutObjectRes{
|
|
id: cliRes.StoredObjectID(),
|
|
}, nil
|
|
}
|
|
|
|
// DeleteObjectPrm groups parameters of DeleteObject operation.
|
|
type DeleteObjectPrm struct {
|
|
commonObjectPrm
|
|
objectAddressPrm
|
|
}
|
|
|
|
// DeleteObjectRes groups the resulting values of DeleteObject operation.
|
|
type DeleteObjectRes struct {
|
|
tomb oid.ID
|
|
}
|
|
|
|
// Tombstone returns the ID of the created object with tombstone.
|
|
func (x DeleteObjectRes) Tombstone() oid.ID {
|
|
return x.tomb
|
|
}
|
|
|
|
// DeleteObject marks an object to be removed from FrostFS through tombstone placement.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func DeleteObject(ctx context.Context, prm DeleteObjectPrm) (*DeleteObjectRes, error) {
|
|
cnr := prm.objAddr.Container()
|
|
obj := prm.objAddr.Object()
|
|
|
|
delPrm := client.PrmObjectDelete{
|
|
XHeaders: prm.xHeaders,
|
|
ContainerID: &cnr,
|
|
ObjectID: &obj,
|
|
Session: prm.sessionToken,
|
|
BearerToken: prm.bearerToken,
|
|
}
|
|
|
|
cliRes, err := prm.cli.ObjectDelete(ctx, delPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("remove object via client: %w", err)
|
|
}
|
|
|
|
return &DeleteObjectRes{
|
|
tomb: cliRes.Tombstone(),
|
|
}, nil
|
|
}
|
|
|
|
// GetObjectPrm groups parameters of GetObject operation.
|
|
type GetObjectPrm struct {
|
|
commonObjectPrm
|
|
objectAddressPrm
|
|
rawPrm
|
|
payloadWriterPrm
|
|
headerCallback func(*objectSDK.Object)
|
|
}
|
|
|
|
// SetHeaderCallback sets callback which is called on the object after the header is received
|
|
// but before the payload is written.
|
|
func (p *GetObjectPrm) SetHeaderCallback(f func(*objectSDK.Object)) {
|
|
p.headerCallback = f
|
|
}
|
|
|
|
// GetObjectRes groups the resulting values of GetObject operation.
|
|
type GetObjectRes struct {
|
|
hdr *objectSDK.Object
|
|
}
|
|
|
|
// Header returns the header of the request object.
|
|
func (x GetObjectRes) Header() *objectSDK.Object {
|
|
return x.hdr
|
|
}
|
|
|
|
// GetObject reads an object by address.
|
|
//
|
|
// Interrupts on any writer error. If successful, payload is written to the writer.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// For raw reading, returns *object.SplitInfoError error if object is virtual.
|
|
func GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) {
|
|
cnr := prm.objAddr.Container()
|
|
obj := prm.objAddr.Object()
|
|
|
|
getPrm := client.PrmObjectGet{
|
|
XHeaders: prm.xHeaders,
|
|
BearerToken: prm.bearerToken,
|
|
Session: prm.sessionToken,
|
|
Raw: prm.raw,
|
|
Local: prm.local,
|
|
ContainerID: &cnr,
|
|
ObjectID: &obj,
|
|
}
|
|
|
|
rdr, err := prm.cli.ObjectGetInit(ctx, getPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init object reading on client: %w", err)
|
|
}
|
|
|
|
var hdr objectSDK.Object
|
|
|
|
if !rdr.ReadHeader(&hdr) {
|
|
_, err = rdr.Close()
|
|
return nil, fmt.Errorf("read object header: %w", err)
|
|
}
|
|
if prm.headerCallback != nil {
|
|
prm.headerCallback(&hdr)
|
|
}
|
|
|
|
_, err = io.Copy(prm.wrt, rdr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("copy payload: %w", err)
|
|
}
|
|
|
|
return &GetObjectRes{
|
|
hdr: &hdr,
|
|
}, nil
|
|
}
|
|
|
|
// HeadObjectPrm groups parameters of HeadObject operation.
|
|
type HeadObjectPrm struct {
|
|
commonObjectPrm
|
|
objectAddressPrm
|
|
rawPrm
|
|
}
|
|
|
|
// HeadObjectRes groups the resulting values of HeadObject operation.
|
|
type HeadObjectRes struct {
|
|
hdr *objectSDK.Object
|
|
}
|
|
|
|
// Header returns the requested object header.
|
|
func (x HeadObjectRes) Header() *objectSDK.Object {
|
|
return x.hdr
|
|
}
|
|
|
|
// HeadObject reads an object header by address.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// For raw reading, returns *object.SplitInfoError error if object is virtual.
|
|
func HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) {
|
|
cnr := prm.objAddr.Container()
|
|
obj := prm.objAddr.Object()
|
|
|
|
headPrm := client.PrmObjectHead{
|
|
XHeaders: prm.xHeaders,
|
|
BearerToken: prm.bearerToken,
|
|
Session: prm.sessionToken,
|
|
Raw: prm.raw,
|
|
Local: prm.local,
|
|
ContainerID: &cnr,
|
|
ObjectID: &obj,
|
|
}
|
|
|
|
res, err := prm.cli.ObjectHead(ctx, headPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read object header via client: %w", err)
|
|
}
|
|
|
|
var hdr objectSDK.Object
|
|
|
|
if !res.ReadHeader(&hdr) {
|
|
return nil, errMissingHeaderInResponse
|
|
}
|
|
|
|
return &HeadObjectRes{
|
|
hdr: &hdr,
|
|
}, nil
|
|
}
|
|
|
|
// SearchObjectsPrm groups parameters of SearchObjects operation.
|
|
type SearchObjectsPrm struct {
|
|
commonObjectPrm
|
|
containerIDPrm
|
|
|
|
filters objectSDK.SearchFilters
|
|
}
|
|
|
|
// SetFilters sets search filters.
|
|
func (x *SearchObjectsPrm) SetFilters(filters objectSDK.SearchFilters) {
|
|
x.filters = filters
|
|
}
|
|
|
|
// SearchObjectsRes groups the resulting values of SearchObjects operation.
|
|
type SearchObjectsRes struct {
|
|
ids []oid.ID
|
|
}
|
|
|
|
// IDList returns identifiers of the matched objects.
|
|
func (x SearchObjectsRes) IDList() []oid.ID {
|
|
return x.ids
|
|
}
|
|
|
|
// SearchObjects selects objects from the container which match the filters.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes, error) {
|
|
cliPrm := client.PrmObjectSearch{
|
|
XHeaders: prm.xHeaders,
|
|
Local: prm.local,
|
|
BearerToken: prm.bearerToken,
|
|
Session: prm.sessionToken,
|
|
ContainerID: &prm.cnrID,
|
|
Filters: prm.filters,
|
|
}
|
|
|
|
rdr, err := prm.cli.ObjectSearchInit(ctx, cliPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init object search: %w", err)
|
|
}
|
|
|
|
buf := make([]oid.ID, 10)
|
|
var list []oid.ID
|
|
var n int
|
|
var ok bool
|
|
|
|
for {
|
|
n, ok = rdr.Read(buf)
|
|
list = append(list, buf[:n]...)
|
|
if !ok {
|
|
break
|
|
}
|
|
}
|
|
|
|
_, err = rdr.Close()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read object list: %w", err)
|
|
}
|
|
|
|
slices.SortFunc(list, func(a, b oid.ID) int {
|
|
return strings.Compare(a.EncodeToString(), b.EncodeToString())
|
|
})
|
|
|
|
return &SearchObjectsRes{
|
|
ids: list,
|
|
}, nil
|
|
}
|
|
|
|
// HashPayloadRangesPrm groups parameters of HashPayloadRanges operation.
|
|
type HashPayloadRangesPrm struct {
|
|
commonObjectPrm
|
|
objectAddressPrm
|
|
|
|
tz bool
|
|
|
|
rngs []objectSDK.Range
|
|
|
|
salt []byte
|
|
}
|
|
|
|
// TZ sets flag to request Tillich-Zemor hashes.
|
|
func (x *HashPayloadRangesPrm) TZ() {
|
|
x.tz = true
|
|
}
|
|
|
|
// SetRanges sets a list of payload ranges to hash.
|
|
func (x *HashPayloadRangesPrm) SetRanges(rngs []objectSDK.Range) {
|
|
x.rngs = rngs
|
|
}
|
|
|
|
// SetSalt sets data for each range to be XOR'ed with.
|
|
func (x *HashPayloadRangesPrm) SetSalt(salt []byte) {
|
|
x.salt = salt
|
|
}
|
|
|
|
// HashPayloadRangesRes groups the resulting values of HashPayloadRanges operation.
|
|
type HashPayloadRangesRes struct {
|
|
cliRes *client.ResObjectHash
|
|
}
|
|
|
|
// HashList returns a list of hashes of the payload ranges keeping order.
|
|
func (x HashPayloadRangesRes) HashList() [][]byte {
|
|
return x.cliRes.Checksums()
|
|
}
|
|
|
|
// HashPayloadRanges requests hashes (by default SHA256) of the object payload ranges.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// Returns an error if number of received hashes differs with the number of requested ranges.
|
|
func HashPayloadRanges(ctx context.Context, prm HashPayloadRangesPrm) (*HashPayloadRangesRes, error) {
|
|
cs := checksum.SHA256
|
|
if prm.tz {
|
|
cs = checksum.TZ
|
|
}
|
|
|
|
cnr := prm.objAddr.Container()
|
|
obj := prm.objAddr.Object()
|
|
cliPrm := client.PrmObjectHash{
|
|
ContainerID: &cnr,
|
|
ObjectID: &obj,
|
|
Local: prm.local,
|
|
Salt: prm.salt,
|
|
Ranges: prm.rngs,
|
|
ChecksumType: cs,
|
|
Session: prm.sessionToken,
|
|
BearerToken: prm.bearerToken,
|
|
XHeaders: prm.xHeaders,
|
|
}
|
|
|
|
res, err := prm.cli.ObjectHash(ctx, cliPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read payload hashes via client: %w", err)
|
|
}
|
|
|
|
return &HashPayloadRangesRes{
|
|
cliRes: res,
|
|
}, nil
|
|
}
|
|
|
|
// PayloadRangePrm groups parameters of PayloadRange operation.
|
|
type PayloadRangePrm struct {
|
|
commonObjectPrm
|
|
objectAddressPrm
|
|
rawPrm
|
|
payloadWriterPrm
|
|
|
|
rng *objectSDK.Range
|
|
}
|
|
|
|
// SetRange sets payload range to read.
|
|
func (x *PayloadRangePrm) SetRange(rng *objectSDK.Range) {
|
|
x.rng = rng
|
|
}
|
|
|
|
// PayloadRangeRes groups the resulting values of PayloadRange operation.
|
|
type PayloadRangeRes struct{}
|
|
|
|
// PayloadRange reads object payload range from FrostFS and writes it to the specified writer.
|
|
//
|
|
// Interrupts on any writer error.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// For raw reading, returns *object.SplitInfoError error if object is virtual.
|
|
func PayloadRange(ctx context.Context, prm PayloadRangePrm) (*PayloadRangeRes, error) {
|
|
cnr := prm.objAddr.Container()
|
|
obj := prm.objAddr.Object()
|
|
|
|
rangePrm := client.PrmObjectRange{
|
|
XHeaders: prm.xHeaders,
|
|
BearerToken: prm.bearerToken,
|
|
Session: prm.sessionToken,
|
|
Raw: prm.raw,
|
|
Local: prm.local,
|
|
ContainerID: &cnr,
|
|
ObjectID: &obj,
|
|
Offset: prm.rng.GetOffset(),
|
|
Length: prm.rng.GetLength(),
|
|
}
|
|
|
|
rdr, err := prm.cli.ObjectRangeInit(ctx, rangePrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init payload reading: %w", err)
|
|
}
|
|
|
|
_, err = io.Copy(prm.wrt, rdr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("copy payload: %w", err)
|
|
}
|
|
|
|
return new(PayloadRangeRes), nil
|
|
}
|
|
|
|
// SyncContainerPrm groups parameters of SyncContainerSettings operation.
|
|
type SyncContainerPrm struct {
|
|
commonPrm
|
|
c *containerSDK.Container
|
|
}
|
|
|
|
// SetContainer sets a container that is required to be synced.
|
|
func (s *SyncContainerPrm) SetContainer(c *containerSDK.Container) {
|
|
s.c = c
|
|
}
|
|
|
|
// SyncContainerRes groups resulting values of SyncContainerSettings
|
|
// operation.
|
|
type SyncContainerRes struct{}
|
|
|
|
// SyncContainerSettings reads global network config from FrostFS and
|
|
// syncs container settings with it.
|
|
//
|
|
// Interrupts on any writer error.
|
|
//
|
|
// Panics if a container passed as a parameter is nil.
|
|
func SyncContainerSettings(ctx context.Context, prm SyncContainerPrm) (*SyncContainerRes, error) {
|
|
if prm.c == nil {
|
|
panic("sync container settings with the network: nil container")
|
|
}
|
|
|
|
err := client.SyncContainerWithNetwork(ctx, prm.c, prm.cli)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return new(SyncContainerRes), nil
|
|
}
|
|
|
|
// PatchObjectPrm groups parameters of PatchObject operation.
|
|
type PatchObjectPrm struct {
|
|
commonObjectPrm
|
|
objectAddressPrm
|
|
|
|
NewAttributes []objectSDK.Attribute
|
|
|
|
ReplaceAttribute bool
|
|
|
|
PayloadPatches []PayloadPatch
|
|
}
|
|
|
|
type PayloadPatch struct {
|
|
Range objectSDK.Range
|
|
|
|
PayloadPath string
|
|
}
|
|
|
|
type PatchRes struct {
|
|
OID oid.ID
|
|
}
|
|
|
|
func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) {
|
|
patchPrm := client.PrmObjectPatch{
|
|
XHeaders: prm.xHeaders,
|
|
BearerToken: prm.bearerToken,
|
|
Session: prm.sessionToken,
|
|
Address: prm.objAddr,
|
|
}
|
|
|
|
slices.SortFunc(prm.PayloadPatches, func(a, b PayloadPatch) int {
|
|
return cmp.Compare(a.Range.GetOffset(), b.Range.GetOffset())
|
|
})
|
|
|
|
patcher, err := prm.cli.ObjectPatchInit(ctx, patchPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init payload reading: %w", err)
|
|
}
|
|
|
|
if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) {
|
|
for _, pp := range prm.PayloadPatches {
|
|
payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
applied := patcher.PatchPayload(ctx, &pp.Range, payloadFile)
|
|
_ = payloadFile.Close()
|
|
if !applied {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
res, err := patcher.Close(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &PatchRes{
|
|
OID: res.ObjectID(),
|
|
}, nil
|
|
}
|