[#165] pool: add docs

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-15 11:54:15 +03:00 committed by Alex Vanin
parent 7811d8eefc
commit 52548fe176
2 changed files with 131 additions and 0 deletions

51
pool/doc.go Normal file
View file

@ -0,0 +1,51 @@
/*
Package pool provides a wrapper for several NeoFS API clients.
The main component is Pool type. It is a virtual connection to the network
and provides methods for executing operations on the server. It also supports
a weighted random selection of the underlying client to make requests.
Create pool instance with 3 nodes connection.
This InitParameters will make pool use 192.168.130.71 node while it is healthy. Otherwise, it will make the pool use
192.168.130.72 for 90% of requests and 192.168.130.73 for remaining 10%.
:
prm := pool.InitParameters{
Key: key,
NodeParams: []NodeParam{
{ Priority: 1, Address: "192.168.130.71", Weight: 1 },
{ Priority: 2, Address: "192.168.130.72", Weight: 9 },
{ Priority: 2, Address: "192.168.130.73", Weight: 1 },
// ...
}
p, err := pool.NewPool(prm)
// ...
Connect to the NeoFS server:
err := p.Dial(ctx)
// ...
Execute NeoFS operation on the server:
var prm pool.PrmContainerPut
prm.SetContainer(cnr)
// ...
res, err := p.PutContainer(context.Background(), prm)
// ...
Execute NeoFS operation on the server and check error:
var prm pool.PrmObjectHead
prm.SetAddress(addr)
// ...
res, err := p.HeadObject(context.Background(), prm)
if client.IsErrObjectNotFound(err) {
// ...
}
// ...
Close the connection:
p.Close()
*/
package pool

View file

@ -77,6 +77,7 @@ type nodesParam struct {
weights []float64 weights []float64
} }
// NodeParam groups parameters of remote node.
type NodeParam struct { type NodeParam struct {
Priority int Priority int
Address string Address string
@ -321,6 +322,27 @@ func (x *PrmBalanceGet) SetOwnerID(ownerID *owner.ID) {
x.ownerID = ownerID x.ownerID = ownerID
} }
// Pool represents virtual connection to the NeoFS network to communicate
// with multiple NeoFS servers without thinking about switching between servers
// due to load balancing proportions or their unavailability.
// It is designed to provide a convenient abstraction from the multiple client.Client types.
//
// Pool can be created and initialized using NewPool function.
// Before executing the NeoFS operations using the Pool, connection to the
// servers MUST BE correctly established (see Dial method).
// Using the Pool before connecting have been established can lead to a panic.
// After the work, the Pool SHOULD BE closed (see Close method): it frees internal
// and system resources which were allocated for the period of work of the Pool.
// Calling Dial/Close methods during the communication process step strongly discouraged
// as it leads to undefined behavior.
//
// Each method which produces a NeoFS API call may return an error.
// Status of underlying server response is casted to built-in error instance.
// Certain statuses can be checked using `client` and standard `errors` packages.
// Note that package provides some helper functions to work with status returns
// (e.g. client.IsErrContainerNotFound, client.IsErrObjectNotFound).
//
// See pool package overview to get some examples.
type Pool struct { type Pool struct {
innerPools []*innerPool innerPools []*innerPool
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
@ -869,6 +891,7 @@ func (p *Pool) callWithRetry(ctx *callContextWithRetry, f func() error) error {
return err return err
} }
// PutObject writes an object through a remote server using NeoFS API protocol.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) {
prm.useDefaultSession() prm.useDefaultSession()
prm.useVerb(sessionv2.ObjectVerbPut) prm.useVerb(sessionv2.ObjectVerbPut)
@ -977,6 +1000,10 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error)
return &id, nil return &id, nil
} }
// DeleteObject marks an object for deletion from the container using NeoFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
prm.useDefaultSession() prm.useDefaultSession()
prm.useVerb(sessionv2.ObjectVerbDelete) prm.useVerb(sessionv2.ObjectVerbDelete)
@ -1016,21 +1043,25 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
type objectReadCloser client.ObjectReader type objectReadCloser client.ObjectReader
// Read implements io.Reader of the object payload.
func (x *objectReadCloser) Read(p []byte) (int, error) { func (x *objectReadCloser) Read(p []byte) (int, error) {
return (*client.ObjectReader)(x).Read(p) return (*client.ObjectReader)(x).Read(p)
} }
// Close implements io.Closer of the object payload.
func (x *objectReadCloser) Close() error { func (x *objectReadCloser) Close() error {
_, err := (*client.ObjectReader)(x).Close() _, err := (*client.ObjectReader)(x).Close()
return err return err
} }
// ResGetObject is designed to provide object header nad read one object payload from NeoFS system.
type ResGetObject struct { type ResGetObject struct {
Header object.Object Header object.Object
Payload io.ReadCloser Payload io.ReadCloser
} }
// GetObject reads object header and initiates reading an object payload through a remote server using NeoFS API protocol.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) {
prm.useDefaultSession() prm.useDefaultSession()
prm.useVerb(sessionv2.ObjectVerbGet) prm.useVerb(sessionv2.ObjectVerbGet)
@ -1082,6 +1113,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject,
return &res, nil return &res, nil
} }
// HeadObject reads object header through a remote server using NeoFS API protocol.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) { func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
prm.useDefaultSession() prm.useDefaultSession()
prm.useVerb(sessionv2.ObjectVerbHead) prm.useVerb(sessionv2.ObjectVerbHead)
@ -1130,19 +1162,29 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec
return &obj, nil return &obj, nil
} }
// ResObjectRange is designed to read payload range of one object
// from NeoFS system.
//
// Must be initialized using Pool.ObjectRange, any other
// usage is unsafe.
type ResObjectRange struct { type ResObjectRange struct {
payload *client.ObjectRangeReader payload *client.ObjectRangeReader
} }
// Read implements io.Reader of the object payload.
func (x *ResObjectRange) Read(p []byte) (int, error) { func (x *ResObjectRange) Read(p []byte) (int, error) {
return x.payload.Read(p) return x.payload.Read(p)
} }
// Close ends reading the payload range and returns the result of the operation
// along with the final results. Must be called after using the ResObjectRange.
func (x *ResObjectRange) Close() error { func (x *ResObjectRange) Close() error {
_, err := x.payload.Close() _, err := x.payload.Close()
return err return err
} }
// ObjectRange initiates reading an object's payload range through a remote
// server using NeoFS API protocol.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) { func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) {
prm.useDefaultSession() prm.useDefaultSession()
prm.useVerb(sessionv2.ObjectVerbRange) prm.useVerb(sessionv2.ObjectVerbRange)
@ -1192,10 +1234,14 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectR
return &res, nil return &res, nil
} }
// ResObjectSearch is designed to read list of object identifiers from NeoFS system.
//
// Must be initialized using Pool.SearchObjects, any other usage is unsafe.
type ResObjectSearch struct { type ResObjectSearch struct {
r *client.ObjectListReader r *client.ObjectListReader
} }
// Read reads another list of the object identifiers.
func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
n, ok := x.r.Read(buf) n, ok := x.r.Read(buf)
if !ok { if !ok {
@ -1210,14 +1256,25 @@ func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
return n, nil return n, nil
} }
// Iterate iterates over the list of found object identifiers.
// f can return true to stop iteration earlier.
//
// Returns an error if object can't be read.
func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error { func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error {
return x.r.Iterate(f) return x.r.Iterate(f)
} }
// Close ends reading list of the matched objects and returns the result of the operation
// along with the final results. Must be called after using the ResObjectSearch.
func (x *ResObjectSearch) Close() { func (x *ResObjectSearch) Close() {
_, _ = x.r.Close() _, _ = x.r.Close()
} }
// SearchObjects initiates object selection through a remote server using NeoFS API protocol.
//
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Exactly one return value is non-nil.
// Resulting reader must be finally closed.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) {
prm.useDefaultSession() prm.useDefaultSession()
prm.useVerb(sessionv2.ObjectVerbSearch) prm.useVerb(sessionv2.ObjectVerbSearch)
@ -1259,6 +1316,12 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObje
return &res, nil return &res, nil
} }
// PutContainer sends request to save container in NeoFS.
//
// Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable.
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1279,6 +1342,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID,
return res.ID(), nil return res.ID(), nil
} }
// GetContainer reads NeoFS container by ID.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*container.Container, error) { func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*container.Container, error) {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1299,6 +1363,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe
return res.Container(), nil return res.Container(), nil
} }
// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1319,6 +1384,12 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
return res.Containers(), nil return res.Containers(), nil
} }
// DeleteContainer sends request to remove the NeoFS container.
//
// Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable.
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1342,6 +1413,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
return err return err
} }
// GetEACL reads eACL table of the NeoFS container.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) { func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1362,6 +1434,12 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table,
return res.Table(), nil return res.Table(), nil
} }
// SetEACL sends request to update eACL table of the NeoFS container.
//
// Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable.
//
// Success can be verified by reading by identifier (see GetEACL).
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1381,6 +1459,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
return err return err
} }
// Balance requests current balance of the NeoFS account.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) { func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) {
cp, err := p.conn(ctx, prm.prmCommon) cp, err := p.conn(ctx, prm.prmCommon)
if err != nil { if err != nil {
@ -1401,6 +1480,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci
return res.Amount(), nil return res.Amount(), nil
} }
// WaitForContainerPresence waits until the container is found on the NeoFS network.
func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error { func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
conn, _, err := p.Connection() conn, _, err := p.Connection()
if err != nil { if err != nil {