package internal import ( "bytes" "cmp" "context" "errors" "fmt" "io" "os" "slices" "strings" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" ) var errMissingHeaderInResponse = errors.New("missing header in response") // BalanceOfPrm groups parameters of BalanceOf operation. type BalanceOfPrm struct { commonPrm client.PrmBalanceGet } // BalanceOfRes groups the resulting values of BalanceOf operation. type BalanceOfRes struct { cliRes *client.ResBalanceGet } // Balance returns the current balance. func (x BalanceOfRes) Balance() accounting.Decimal { return x.cliRes.Amount() } // BalanceOf requests the current balance of a FrostFS user. // // Returns any error which prevented the operation from completing correctly in error return. func BalanceOf(ctx context.Context, prm BalanceOfPrm) (res BalanceOfRes, err error) { res.cliRes, err = prm.cli.BalanceGet(ctx, prm.PrmBalanceGet) return } // ListContainersPrm groups parameters of ListContainers operation. type ListContainersPrm struct { commonPrm client.PrmContainerList } // ListContainersRes groups the resulting values of ListContainers operation. type ListContainersRes struct { cliRes *client.ResContainerList } // IDList returns list of identifiers of user's containers. func (x ListContainersRes) IDList() []cid.ID { return x.cliRes.Containers() } // ListContainers requests a list of FrostFS user's containers. // // Returns any error which prevented the operation from completing correctly in error return. func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContainersRes, err error) { res.cliRes, err = prm.cli.ContainerList(ctx, prm.PrmContainerList) return } // SortedIDList returns sorted list of identifiers of user's containers. func (x ListContainersRes) SortedIDList() []cid.ID { list := x.cliRes.Containers() slices.SortFunc(list, func(lhs, rhs cid.ID) int { return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString()) }) return list } func ListContainersStream(ctx context.Context, prm ListContainersPrm, processCnr func(id cid.ID) bool) (err error) { cliPrm := &client.PrmContainerListStream{ XHeaders: prm.XHeaders, OwnerID: prm.OwnerID, Session: prm.Session, } rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm) if err != nil { return fmt.Errorf("init container list: %w", err) } err = rdr.Iterate(processCnr) if err != nil { return fmt.Errorf("read container list: %w", err) } return } // PutContainerPrm groups parameters of PutContainer operation. type PutContainerPrm struct { Client *client.Client ClientParams client.PrmContainerPut } // PutContainerRes groups the resulting values of PutContainer operation. type PutContainerRes struct { cnr cid.ID } // ID returns identifier of the created container. func (x PutContainerRes) ID() cid.ID { return x.cnr } // PutContainer sends a request to save the container in FrostFS. // // Operation is asynchronous and not guaranteed even in the absence of errors. // The required time is also not predictable. // // Success can be verified by reading by identifier. // // Returns any error which prevented the operation from completing correctly in error return. func PutContainer(ctx context.Context, prm PutContainerPrm) (res PutContainerRes, err error) { cliRes, err := prm.Client.ContainerPut(ctx, prm.ClientParams) if err == nil { res.cnr = cliRes.ID() } return } // GetContainerPrm groups parameters of GetContainer operation. type GetContainerPrm struct { Client *client.Client ClientParams client.PrmContainerGet } // SetContainer sets identifier of the container to be read. // // Deprecated: Use GetContainerPrm.ClientParams.ContainerID instead. func (x *GetContainerPrm) SetContainer(id cid.ID) { x.ClientParams.ContainerID = &id } // GetContainerRes groups the resulting values of GetContainer operation. type GetContainerRes struct { cliRes *client.ResContainerGet } // Container returns structured of the requested container. func (x GetContainerRes) Container() containerSDK.Container { return x.cliRes.Container() } // GetContainer reads a container from FrostFS by ID. // // Returns any error which prevented the operation from completing correctly in error return. func GetContainer(ctx context.Context, prm GetContainerPrm) (res GetContainerRes, err error) { res.cliRes, err = prm.Client.ContainerGet(ctx, prm.ClientParams) return } // IsACLExtendable checks if ACL of the container referenced by the given identifier // can be extended. Client connection MUST BE correctly established in advance. func IsACLExtendable(ctx context.Context, c *client.Client, cnr cid.ID) (bool, error) { prm := GetContainerPrm{ Client: c, ClientParams: client.PrmContainerGet{ ContainerID: &cnr, }, } res, err := GetContainer(ctx, prm) if err != nil { return false, fmt.Errorf("get container from the FrostFS: %w", err) } return res.Container().BasicACL().Extendable(), nil } // DeleteContainerPrm groups parameters of DeleteContainerPrm operation. type DeleteContainerPrm struct { Client *client.Client ClientParams client.PrmContainerDelete } // DeleteContainerRes groups the resulting values of DeleteContainer operation. type DeleteContainerRes struct{} // DeleteContainer sends a request to remove a container from FrostFS by ID. // // Operation is asynchronous and not guaranteed even in the absence of errors. // The required time is also not predictable. // // Success can be verified by reading by identifier. // // Returns any error which prevented the operation from completing correctly in error return. func DeleteContainer(ctx context.Context, prm DeleteContainerPrm) (res DeleteContainerRes, err error) { _, err = prm.Client.ContainerDelete(ctx, prm.ClientParams) return } // NetworkInfoPrm groups parameters of NetworkInfo operation. type NetworkInfoPrm struct { Client *client.Client ClientParams client.PrmNetworkInfo } // NetworkInfoRes groups the resulting values of NetworkInfo operation. type NetworkInfoRes struct { cliRes *client.ResNetworkInfo } // NetworkInfo returns structured information about the FrostFS network. func (x NetworkInfoRes) NetworkInfo() netmap.NetworkInfo { return x.cliRes.Info() } // NetworkInfo reads information about the FrostFS network. // // Returns any error which prevented the operation from completing correctly in error return. func NetworkInfo(ctx context.Context, prm NetworkInfoPrm) (res NetworkInfoRes, err error) { res.cliRes, err = prm.Client.NetworkInfo(ctx, prm.ClientParams) return } // NodeInfoPrm groups parameters of NodeInfo operation. type NodeInfoPrm struct { Client *client.Client ClientParams client.PrmEndpointInfo } // NodeInfoRes groups the resulting values of NodeInfo operation. type NodeInfoRes struct { cliRes *client.ResEndpointInfo } // NodeInfo returns information about the node from netmap. func (x NodeInfoRes) NodeInfo() netmap.NodeInfo { return x.cliRes.NodeInfo() } // LatestVersion returns the latest FrostFS API version in use. func (x NodeInfoRes) LatestVersion() version.Version { return x.cliRes.LatestVersion() } // NodeInfo requests information about the remote server from FrostFS netmap. // // Returns any error which prevented the operation from completing correctly in error return. func NodeInfo(ctx context.Context, prm NodeInfoPrm) (res NodeInfoRes, err error) { res.cliRes, err = prm.Client.EndpointInfo(ctx, prm.ClientParams) return } // NetMapSnapshotPrm groups parameters of NetMapSnapshot operation. type NetMapSnapshotPrm struct { commonPrm } // NetMapSnapshotRes groups the resulting values of NetMapSnapshot operation. type NetMapSnapshotRes struct { cliRes *client.ResNetMapSnapshot } // NetMap returns current local snapshot of the FrostFS network map. func (x NetMapSnapshotRes) NetMap() netmap.NetMap { return x.cliRes.NetMap() } // NetMapSnapshot requests current network view of the remote server. // // Returns any error which prevented the operation from completing correctly in error return. func NetMapSnapshot(ctx context.Context, prm NetMapSnapshotPrm) (res NetMapSnapshotRes, err error) { res.cliRes, err = prm.cli.NetMapSnapshot(ctx, client.PrmNetMapSnapshot{}) return } // CreateSessionPrm groups parameters of CreateSession operation. type CreateSessionPrm struct { commonPrm client.PrmSessionCreate } // CreateSessionRes groups the resulting values of CreateSession operation. type CreateSessionRes struct { cliRes *client.ResSessionCreate } // ID returns session identifier. func (x CreateSessionRes) ID() []byte { return x.cliRes.ID() } // SessionKey returns public session key in a binary format. func (x CreateSessionRes) SessionKey() []byte { return x.cliRes.PublicKey() } // CreateSession opens a new unlimited session with the remote node. // // Returns any error which prevented the operation from completing correctly in error return. func CreateSession(ctx context.Context, prm CreateSessionPrm) (res CreateSessionRes, err error) { res.cliRes, err = prm.cli.SessionCreate(ctx, prm.PrmSessionCreate) return } // PutObjectPrm groups parameters of PutObject operation. type PutObjectPrm struct { commonObjectPrm copyNum []uint32 hdr *objectSDK.Object rdr io.Reader headerCallback func() prepareLocally bool } // SetHeader sets object header. func (x *PutObjectPrm) SetHeader(hdr *objectSDK.Object) { x.hdr = hdr } // SetPayloadReader sets reader of the object payload. func (x *PutObjectPrm) SetPayloadReader(rdr io.Reader) { x.rdr = rdr } // SetHeaderCallback sets callback which is called on the object after the header is received // but before the payload is written. func (x *PutObjectPrm) SetHeaderCallback(f func()) { x.headerCallback = f } // SetCopiesNumberByVectors sets ordered list of minimal required object copies numbers // per placement vector. func (x *PutObjectPrm) SetCopiesNumberByVectors(copiesNumbers []uint32) { x.copyNum = copiesNumbers } // PrepareLocally generate object header on the client side. // For big object - split locally too. func (x *PutObjectPrm) PrepareLocally() { x.prepareLocally = true } func (x *PutObjectPrm) convertToSDKPrm(ctx context.Context) (client.PrmObjectPutInit, error) { putPrm := client.PrmObjectPutInit{ XHeaders: x.xHeaders, BearerToken: x.bearerToken, Local: x.local, CopiesNumber: x.copyNum, } if x.prepareLocally { res, err := x.cli.NetworkInfo(ctx, client.PrmNetworkInfo{}) if err != nil { return client.PrmObjectPutInit{}, err } putPrm.MaxSize = res.Info().MaxObjectSize() putPrm.EpochSource = epochSource(res.Info().CurrentEpoch()) putPrm.WithoutHomomorphHash = res.Info().HomomorphicHashingDisabled() } else { putPrm.Session = x.sessionToken } return putPrm, nil } // PutObjectRes groups the resulting values of PutObject operation. type PutObjectRes struct { id oid.ID } // ID returns identifier of the created object. func (x PutObjectRes) ID() oid.ID { return x.id } type epochSource uint64 func (s epochSource) CurrentEpoch() uint64 { return uint64(s) } // PutObject saves the object in FrostFS network. // // Returns any error which prevented the operation from completing correctly in error return. func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { sdkPrm, err := prm.convertToSDKPrm(ctx) if err != nil { return nil, fmt.Errorf("unable to create parameters of object put operation: %w", err) } wrt, err := prm.cli.ObjectPutInit(ctx, sdkPrm) if err != nil { return nil, fmt.Errorf("init object writing: %w", err) } if wrt.WriteHeader(ctx, *prm.hdr) { if prm.headerCallback != nil { prm.headerCallback() } sz := prm.hdr.PayloadSize() if data := prm.hdr.Payload(); len(data) > 0 { if prm.rdr != nil { prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr) } else { prm.rdr = bytes.NewReader(data) sz = uint64(len(data)) } } if prm.rdr != nil { const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK. if sz == 0 || sz > defaultBufferSizePut { sz = defaultBufferSizePut } buf := make([]byte, sz) var n int for { n, err = prm.rdr.Read(buf) if n > 0 { if !wrt.WritePayloadChunk(ctx, buf[:n]) { break } continue } if errors.Is(err, io.EOF) { break } return nil, fmt.Errorf("read payload: %w", err) } } } cliRes, err := wrt.Close(ctx) if err != nil { // here err already carries both status and client errors return nil, fmt.Errorf("client failure: %w", err) } return &PutObjectRes{ id: cliRes.StoredObjectID(), }, nil } // DeleteObjectPrm groups parameters of DeleteObject operation. type DeleteObjectPrm struct { commonObjectPrm objectAddressPrm } // DeleteObjectRes groups the resulting values of DeleteObject operation. type DeleteObjectRes struct { tomb oid.ID } // Tombstone returns the ID of the created object with tombstone. func (x DeleteObjectRes) Tombstone() oid.ID { return x.tomb } // DeleteObject marks an object to be removed from FrostFS through tombstone placement. // // Returns any error which prevented the operation from completing correctly in error return. func DeleteObject(ctx context.Context, prm DeleteObjectPrm) (*DeleteObjectRes, error) { cnr := prm.objAddr.Container() obj := prm.objAddr.Object() delPrm := client.PrmObjectDelete{ XHeaders: prm.xHeaders, ContainerID: &cnr, ObjectID: &obj, Session: prm.sessionToken, BearerToken: prm.bearerToken, } cliRes, err := prm.cli.ObjectDelete(ctx, delPrm) if err != nil { return nil, fmt.Errorf("remove object via client: %w", err) } return &DeleteObjectRes{ tomb: cliRes.Tombstone(), }, nil } // GetObjectPrm groups parameters of GetObject operation. type GetObjectPrm struct { commonObjectPrm objectAddressPrm rawPrm payloadWriterPrm headerCallback func(*objectSDK.Object) } // SetHeaderCallback sets callback which is called on the object after the header is received // but before the payload is written. func (p *GetObjectPrm) SetHeaderCallback(f func(*objectSDK.Object)) { p.headerCallback = f } // GetObjectRes groups the resulting values of GetObject operation. type GetObjectRes struct { hdr *objectSDK.Object } // Header returns the header of the request object. func (x GetObjectRes) Header() *objectSDK.Object { return x.hdr } // GetObject reads an object by address. // // Interrupts on any writer error. If successful, payload is written to the writer. // // Returns any error which prevented the operation from completing correctly in error return. // For raw reading, returns *object.SplitInfoError error if object is virtual. func GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) { cnr := prm.objAddr.Container() obj := prm.objAddr.Object() getPrm := client.PrmObjectGet{ XHeaders: prm.xHeaders, BearerToken: prm.bearerToken, Session: prm.sessionToken, Raw: prm.raw, Local: prm.local, ContainerID: &cnr, ObjectID: &obj, } rdr, err := prm.cli.ObjectGetInit(ctx, getPrm) if err != nil { return nil, fmt.Errorf("init object reading on client: %w", err) } var hdr objectSDK.Object if !rdr.ReadHeader(&hdr) { _, err = rdr.Close() return nil, fmt.Errorf("read object header: %w", err) } if prm.headerCallback != nil { prm.headerCallback(&hdr) } _, err = io.Copy(prm.wrt, rdr) if err != nil { return nil, fmt.Errorf("copy payload: %w", err) } return &GetObjectRes{ hdr: &hdr, }, nil } // HeadObjectPrm groups parameters of HeadObject operation. type HeadObjectPrm struct { commonObjectPrm objectAddressPrm rawPrm } // HeadObjectRes groups the resulting values of HeadObject operation. type HeadObjectRes struct { hdr *objectSDK.Object } // Header returns the requested object header. func (x HeadObjectRes) Header() *objectSDK.Object { return x.hdr } // HeadObject reads an object header by address. // // Returns any error which prevented the operation from completing correctly in error return. // For raw reading, returns *object.SplitInfoError error if object is virtual. func HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) { cnr := prm.objAddr.Container() obj := prm.objAddr.Object() headPrm := client.PrmObjectHead{ XHeaders: prm.xHeaders, BearerToken: prm.bearerToken, Session: prm.sessionToken, Raw: prm.raw, Local: prm.local, ContainerID: &cnr, ObjectID: &obj, } res, err := prm.cli.ObjectHead(ctx, headPrm) if err != nil { return nil, fmt.Errorf("read object header via client: %w", err) } var hdr objectSDK.Object if !res.ReadHeader(&hdr) { return nil, errMissingHeaderInResponse } return &HeadObjectRes{ hdr: &hdr, }, nil } // SearchObjectsPrm groups parameters of SearchObjects operation. type SearchObjectsPrm struct { commonObjectPrm containerIDPrm filters objectSDK.SearchFilters } // SetFilters sets search filters. func (x *SearchObjectsPrm) SetFilters(filters objectSDK.SearchFilters) { x.filters = filters } // SearchObjectsRes groups the resulting values of SearchObjects operation. type SearchObjectsRes struct { ids []oid.ID } // IDList returns identifiers of the matched objects. func (x SearchObjectsRes) IDList() []oid.ID { return x.ids } // SearchObjects selects objects from the container which match the filters. // // Returns any error which prevented the operation from completing correctly in error return. func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes, error) { cliPrm := client.PrmObjectSearch{ XHeaders: prm.xHeaders, Local: prm.local, BearerToken: prm.bearerToken, Session: prm.sessionToken, ContainerID: &prm.cnrID, Filters: prm.filters, } rdr, err := prm.cli.ObjectSearchInit(ctx, cliPrm) if err != nil { return nil, fmt.Errorf("init object search: %w", err) } buf := make([]oid.ID, 10) var list []oid.ID var n int var ok bool for { n, ok = rdr.Read(buf) list = append(list, buf[:n]...) if !ok { break } } _, err = rdr.Close() if err != nil { return nil, fmt.Errorf("read object list: %w", err) } slices.SortFunc(list, func(a, b oid.ID) int { return strings.Compare(a.EncodeToString(), b.EncodeToString()) }) return &SearchObjectsRes{ ids: list, }, nil } // HashPayloadRangesPrm groups parameters of HashPayloadRanges operation. type HashPayloadRangesPrm struct { commonObjectPrm objectAddressPrm tz bool rngs []objectSDK.Range salt []byte } // TZ sets flag to request Tillich-Zemor hashes. func (x *HashPayloadRangesPrm) TZ() { x.tz = true } // SetRanges sets a list of payload ranges to hash. func (x *HashPayloadRangesPrm) SetRanges(rngs []objectSDK.Range) { x.rngs = rngs } // SetSalt sets data for each range to be XOR'ed with. func (x *HashPayloadRangesPrm) SetSalt(salt []byte) { x.salt = salt } // HashPayloadRangesRes groups the resulting values of HashPayloadRanges operation. type HashPayloadRangesRes struct { cliRes *client.ResObjectHash } // HashList returns a list of hashes of the payload ranges keeping order. func (x HashPayloadRangesRes) HashList() [][]byte { return x.cliRes.Checksums() } // HashPayloadRanges requests hashes (by default SHA256) of the object payload ranges. // // Returns any error which prevented the operation from completing correctly in error return. // Returns an error if number of received hashes differs with the number of requested ranges. func HashPayloadRanges(ctx context.Context, prm HashPayloadRangesPrm) (*HashPayloadRangesRes, error) { cs := checksum.SHA256 if prm.tz { cs = checksum.TZ } cnr := prm.objAddr.Container() obj := prm.objAddr.Object() cliPrm := client.PrmObjectHash{ ContainerID: &cnr, ObjectID: &obj, Local: prm.local, Salt: prm.salt, Ranges: prm.rngs, ChecksumType: cs, Session: prm.sessionToken, BearerToken: prm.bearerToken, XHeaders: prm.xHeaders, } res, err := prm.cli.ObjectHash(ctx, cliPrm) if err != nil { return nil, fmt.Errorf("read payload hashes via client: %w", err) } return &HashPayloadRangesRes{ cliRes: res, }, nil } // PayloadRangePrm groups parameters of PayloadRange operation. type PayloadRangePrm struct { commonObjectPrm objectAddressPrm rawPrm payloadWriterPrm rng *objectSDK.Range } // SetRange sets payload range to read. func (x *PayloadRangePrm) SetRange(rng *objectSDK.Range) { x.rng = rng } // PayloadRangeRes groups the resulting values of PayloadRange operation. type PayloadRangeRes struct{} // PayloadRange reads object payload range from FrostFS and writes it to the specified writer. // // Interrupts on any writer error. // // Returns any error which prevented the operation from completing correctly in error return. // For raw reading, returns *object.SplitInfoError error if object is virtual. func PayloadRange(ctx context.Context, prm PayloadRangePrm) (*PayloadRangeRes, error) { cnr := prm.objAddr.Container() obj := prm.objAddr.Object() rangePrm := client.PrmObjectRange{ XHeaders: prm.xHeaders, BearerToken: prm.bearerToken, Session: prm.sessionToken, Raw: prm.raw, Local: prm.local, ContainerID: &cnr, ObjectID: &obj, Offset: prm.rng.GetOffset(), Length: prm.rng.GetLength(), } rdr, err := prm.cli.ObjectRangeInit(ctx, rangePrm) if err != nil { return nil, fmt.Errorf("init payload reading: %w", err) } _, err = io.Copy(prm.wrt, rdr) if err != nil { return nil, fmt.Errorf("copy payload: %w", err) } return new(PayloadRangeRes), nil } // SyncContainerPrm groups parameters of SyncContainerSettings operation. type SyncContainerPrm struct { commonPrm c *containerSDK.Container } // SetContainer sets a container that is required to be synced. func (s *SyncContainerPrm) SetContainer(c *containerSDK.Container) { s.c = c } // SyncContainerRes groups resulting values of SyncContainerSettings // operation. type SyncContainerRes struct{} // SyncContainerSettings reads global network config from FrostFS and // syncs container settings with it. // // Interrupts on any writer error. // // Panics if a container passed as a parameter is nil. func SyncContainerSettings(ctx context.Context, prm SyncContainerPrm) (*SyncContainerRes, error) { if prm.c == nil { panic("sync container settings with the network: nil container") } err := client.SyncContainerWithNetwork(ctx, prm.c, prm.cli) if err != nil { return nil, err } return new(SyncContainerRes), nil } // PatchObjectPrm groups parameters of PatchObject operation. type PatchObjectPrm struct { commonObjectPrm objectAddressPrm NewAttributes []objectSDK.Attribute ReplaceAttribute bool PayloadPatches []PayloadPatch } type PayloadPatch struct { Range objectSDK.Range PayloadPath string } type PatchRes struct { OID oid.ID } func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) { patchPrm := client.PrmObjectPatch{ XHeaders: prm.xHeaders, BearerToken: prm.bearerToken, Session: prm.sessionToken, Address: prm.objAddr, } slices.SortFunc(prm.PayloadPatches, func(a, b PayloadPatch) int { return cmp.Compare(a.Range.GetOffset(), b.Range.GetOffset()) }) patcher, err := prm.cli.ObjectPatchInit(ctx, patchPrm) if err != nil { return nil, fmt.Errorf("init payload reading: %w", err) } if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) { for _, pp := range prm.PayloadPatches { payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm) if err != nil { return nil, err } applied := patcher.PatchPayload(ctx, &pp.Range, payloadFile) _ = payloadFile.Close() if !applied { break } } } res, err := patcher.Close(ctx) if err != nil { return nil, err } return &PatchRes{ OID: res.ObjectID(), }, nil }