diff --git a/pool/doc.go b/pool/doc.go new file mode 100644 index 0000000..792c89a --- /dev/null +++ b/pool/doc.go @@ -0,0 +1,51 @@ +/* +Package pool provides a wrapper for several NeoFS API clients. + +The main component is Pool type. It is a virtual connection to the network +and provides methods for executing operations on the server. It also supports +a weighted random selection of the underlying client to make requests. + +Create pool instance with 3 nodes connection. +This InitParameters will make pool use 192.168.130.71 node while it is healthy. Otherwise, it will make the pool use +192.168.130.72 for 90% of requests and 192.168.130.73 for remaining 10%. +: + prm := pool.InitParameters{ + Key: key, + NodeParams: []NodeParam{ + { Priority: 1, Address: "192.168.130.71", Weight: 1 }, + { Priority: 2, Address: "192.168.130.72", Weight: 9 }, + { Priority: 2, Address: "192.168.130.73", Weight: 1 }, + // ... + } + + p, err := pool.NewPool(prm) + // ... + +Connect to the NeoFS server: + err := p.Dial(ctx) + // ... + +Execute NeoFS operation on the server: + var prm pool.PrmContainerPut + prm.SetContainer(cnr) + // ... + + res, err := p.PutContainer(context.Background(), prm) + // ... + +Execute NeoFS operation on the server and check error: + var prm pool.PrmObjectHead + prm.SetAddress(addr) + // ... + + res, err := p.HeadObject(context.Background(), prm) + if client.IsErrObjectNotFound(err) { + // ... + } + // ... + +Close the connection: + p.Close() + +*/ +package pool diff --git a/pool/pool.go b/pool/pool.go index e94e1a7..456a8d7 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -77,6 +77,7 @@ type nodesParam struct { weights []float64 } +// NodeParam groups parameters of remote node. type NodeParam struct { Priority int Address string @@ -321,6 +322,27 @@ func (x *PrmBalanceGet) SetOwnerID(ownerID *owner.ID) { x.ownerID = ownerID } +// Pool represents virtual connection to the NeoFS network to communicate +// with multiple NeoFS servers without thinking about switching between servers +// due to load balancing proportions or their unavailability. +// It is designed to provide a convenient abstraction from the multiple client.Client types. +// +// Pool can be created and initialized using NewPool function. +// Before executing the NeoFS operations using the Pool, connection to the +// servers MUST BE correctly established (see Dial method). +// Using the Pool before connecting have been established can lead to a panic. +// After the work, the Pool SHOULD BE closed (see Close method): it frees internal +// and system resources which were allocated for the period of work of the Pool. +// Calling Dial/Close methods during the communication process step strongly discouraged +// as it leads to undefined behavior. +// +// Each method which produces a NeoFS API call may return an error. +// Status of underlying server response is casted to built-in error instance. +// Certain statuses can be checked using `client` and standard `errors` packages. +// Note that package provides some helper functions to work with status returns +// (e.g. client.IsErrContainerNotFound, client.IsErrObjectNotFound). +// +// See pool package overview to get some examples. type Pool struct { innerPools []*innerPool key *ecdsa.PrivateKey @@ -869,6 +891,7 @@ func (p *Pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { return err } +// PutObject writes an object through a remote server using NeoFS API protocol. func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { prm.useDefaultSession() prm.useVerb(sessionv2.ObjectVerbPut) @@ -977,6 +1000,10 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) return &id, nil } +// DeleteObject marks an object for deletion from the container using NeoFS API protocol. +// As a marker, a special unit called a tombstone is placed in the container. +// It confirms the user's intent to delete the object, and is itself a container object. +// Explicit deletion is done asynchronously, and is generally not guaranteed. func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { prm.useDefaultSession() prm.useVerb(sessionv2.ObjectVerbDelete) @@ -1016,21 +1043,25 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { type objectReadCloser client.ObjectReader +// Read implements io.Reader of the object payload. func (x *objectReadCloser) Read(p []byte) (int, error) { return (*client.ObjectReader)(x).Read(p) } +// Close implements io.Closer of the object payload. func (x *objectReadCloser) Close() error { _, err := (*client.ObjectReader)(x).Close() return err } +// ResGetObject is designed to provide object header nad read one object payload from NeoFS system. type ResGetObject struct { Header object.Object Payload io.ReadCloser } +// GetObject reads object header and initiates reading an object payload through a remote server using NeoFS API protocol. func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { prm.useDefaultSession() prm.useVerb(sessionv2.ObjectVerbGet) @@ -1082,6 +1113,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, return &res, nil } +// HeadObject reads object header through a remote server using NeoFS API protocol. func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) { prm.useDefaultSession() prm.useVerb(sessionv2.ObjectVerbHead) @@ -1130,19 +1162,29 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec return &obj, nil } +// ResObjectRange is designed to read payload range of one object +// from NeoFS system. +// +// Must be initialized using Pool.ObjectRange, any other +// usage is unsafe. type ResObjectRange struct { payload *client.ObjectRangeReader } +// Read implements io.Reader of the object payload. func (x *ResObjectRange) Read(p []byte) (int, error) { return x.payload.Read(p) } +// Close ends reading the payload range and returns the result of the operation +// along with the final results. Must be called after using the ResObjectRange. func (x *ResObjectRange) Close() error { _, err := x.payload.Close() return err } +// ObjectRange initiates reading an object's payload range through a remote +// server using NeoFS API protocol. func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) { prm.useDefaultSession() prm.useVerb(sessionv2.ObjectVerbRange) @@ -1192,10 +1234,14 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectR return &res, nil } +// ResObjectSearch is designed to read list of object identifiers from NeoFS system. +// +// Must be initialized using Pool.SearchObjects, any other usage is unsafe. type ResObjectSearch struct { r *client.ObjectListReader } +// Read reads another list of the object identifiers. func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { n, ok := x.r.Read(buf) if !ok { @@ -1210,14 +1256,25 @@ func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) { return n, nil } +// Iterate iterates over the list of found object identifiers. +// f can return true to stop iteration earlier. +// +// Returns an error if object can't be read. func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error { return x.r.Iterate(f) } +// Close ends reading list of the matched objects and returns the result of the operation +// along with the final results. Must be called after using the ResObjectSearch. func (x *ResObjectSearch) Close() { _, _ = x.r.Close() } +// SearchObjects initiates object selection through a remote server using NeoFS API protocol. +// +// The call only opens the transmission channel, explicit fetching of matched objects +// is done using the ResObjectSearch. Exactly one return value is non-nil. +// Resulting reader must be finally closed. func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { prm.useDefaultSession() prm.useVerb(sessionv2.ObjectVerbSearch) @@ -1259,6 +1316,12 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObje return &res, nil } +// PutContainer sends request to save container in NeoFS. +// +// Operation is asynchronous and no guaranteed even in the absence of errors. +// The required time is also not predictable. +// +// Success can be verified by reading by identifier (see GetContainer). func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1279,6 +1342,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, return res.ID(), nil } +// GetContainer reads NeoFS container by ID. func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*container.Container, error) { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1299,6 +1363,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe return res.Container(), nil } +// ListContainers requests identifiers of the account-owned containers. func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1319,6 +1384,12 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid. return res.Containers(), nil } +// DeleteContainer sends request to remove the NeoFS container. +// +// Operation is asynchronous and no guaranteed even in the absence of errors. +// The required time is also not predictable. +// +// Success can be verified by reading by identifier (see GetContainer). func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1342,6 +1413,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro return err } +// GetEACL reads eACL table of the NeoFS container. func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1362,6 +1434,12 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, return res.Table(), nil } +// SetEACL sends request to update eACL table of the NeoFS container. +// +// Operation is asynchronous and no guaranteed even in the absence of errors. +// The required time is also not predictable. +// +// Success can be verified by reading by identifier (see GetEACL). func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1381,6 +1459,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { return err } +// Balance requests current balance of the NeoFS account. func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) { cp, err := p.conn(ctx, prm.prmCommon) if err != nil { @@ -1401,6 +1480,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci return res.Amount(), nil } +// WaitForContainerPresence waits until the container is found on the NeoFS network. func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error { conn, _, err := p.Connection() if err != nil {