From e56eef495da456b41dae3ae9697287733c40907a Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 21 Mar 2022 14:29:29 +0300 Subject: [PATCH] [#85] pool: refactor client interface Signed-off-by: Denis Kirillov --- pool/pool.go | 937 +++++++++++++++++++++++++++++---------------------- 1 file changed, 543 insertions(+), 394 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 982378c..21195d4 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -31,24 +31,421 @@ import ( "go.uber.org/zap" ) -// client is a wrapper for sdkClient.Client to generate mock. +// client represents virtual connection to the single NeoFS network endpoint from which Pool is formed. type client interface { - BalanceGet(context.Context, sdkClient.PrmBalanceGet) (*sdkClient.ResBalanceGet, error) - ContainerPut(context.Context, sdkClient.PrmContainerPut) (*sdkClient.ResContainerPut, error) - ContainerGet(context.Context, sdkClient.PrmContainerGet) (*sdkClient.ResContainerGet, error) - ContainerList(context.Context, sdkClient.PrmContainerList) (*sdkClient.ResContainerList, error) - ContainerDelete(context.Context, sdkClient.PrmContainerDelete) (*sdkClient.ResContainerDelete, error) - ContainerEACL(context.Context, sdkClient.PrmContainerEACL) (*sdkClient.ResContainerEACL, error) - ContainerSetEACL(context.Context, sdkClient.PrmContainerSetEACL) (*sdkClient.ResContainerSetEACL, error) - EndpointInfo(context.Context, sdkClient.PrmEndpointInfo) (*sdkClient.ResEndpointInfo, error) - NetworkInfo(context.Context, sdkClient.PrmNetworkInfo) (*sdkClient.ResNetworkInfo, error) - ObjectPutInit(context.Context, sdkClient.PrmObjectPutInit) (*sdkClient.ObjectWriter, error) - ObjectDelete(context.Context, sdkClient.PrmObjectDelete) (*sdkClient.ResObjectDelete, error) - ObjectGetInit(context.Context, sdkClient.PrmObjectGet) (*sdkClient.ObjectReader, error) - ObjectHead(context.Context, sdkClient.PrmObjectHead) (*sdkClient.ResObjectHead, error) - ObjectRangeInit(context.Context, sdkClient.PrmObjectRange) (*sdkClient.ObjectRangeReader, error) - ObjectSearchInit(context.Context, sdkClient.PrmObjectSearch) (*sdkClient.ObjectListReader, error) - SessionCreate(context.Context, sdkClient.PrmSessionCreate) (*sdkClient.ResSessionCreate, error) + BalanceGet(context.Context, PrmBalanceGet) (*accounting.Decimal, error) + ContainerPut(context.Context, PrmContainerPut) (*cid.ID, error) + ContainerGet(context.Context, PrmContainerGet) (*container.Container, error) + ContainerList(context.Context, PrmContainerList) ([]cid.ID, error) + ContainerDelete(context.Context, PrmContainerDelete) error + ContainerEACL(context.Context, PrmContainerEACL) (*eacl.Table, error) + ContainerSetEACL(context.Context, PrmContainerSetEACL) error + EndpointInfo(context.Context, PrmEndpointInfo) (*netmap.NodeInfo, error) + NetworkInfo(context.Context, PrmNetworkInfo) (*netmap.NetworkInfo, error) + ObjectPut(context.Context, PrmObjectPut) (*oid.ID, error) + ObjectDelete(context.Context, PrmObjectDelete) error + ObjectGet(context.Context, PrmObjectGet) (*ResGetObject, error) + ObjectHead(context.Context, PrmObjectHead) (*object.Object, error) + ObjectRange(context.Context, PrmObjectRange) (*ResObjectRange, error) + ObjectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error) + SessionCreate(context.Context, PrmCreateSession) (*ResCreateSession, error) +} + +// clientWrapper is used by default, alternative implementations are intended for testing purposes only. +type clientWrapper struct { + client *sdkClient.Client + key ecdsa.PrivateKey +} + +type wrapperPrm struct { + address string + key ecdsa.PrivateKey + timeout time.Duration + responseInfoCallback func(sdkClient.ResponseMetaInfo) error +} + +func (x *wrapperPrm) setAddress(address string) { + x.address = address +} + +func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { + x.key = key +} + +func (x *wrapperPrm) setTimeout(timeout time.Duration) { + x.timeout = timeout +} + +func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { + x.responseInfoCallback = f +} + +func newWrapper(prm wrapperPrm) (*clientWrapper, error) { + var prmInit sdkClient.PrmInit + prmInit.ResolveNeoFSFailures() + prmInit.SetDefaultPrivateKey(prm.key) + prmInit.SetResponseInfoCallback(prm.responseInfoCallback) + + var c sdkClient.Client + c.Init(prmInit) + + var prmDial sdkClient.PrmDial + prmDial.SetServerURI(prm.address) + prmDial.SetTimeout(prm.timeout) + + return &clientWrapper{client: &c, key: prm.key}, c.Dial(prmDial) +} + +func (c *clientWrapper) BalanceGet(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) { + var cliPrm sdkClient.PrmBalanceGet + cliPrm.SetAccount(prm.ownerID) + + res, err := c.client.BalanceGet(ctx, cliPrm) + if err != nil { + return nil, err + } + return res.Amount(), nil +} + +func (c *clientWrapper) ContainerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { + var cliPrm sdkClient.PrmContainerPut + cliPrm.SetContainer(prm.cnr) + + res, err := c.client.ContainerPut(ctx, cliPrm) + if err != nil { + return nil, err + } + + if !prm.waitParamsSet { + prm.waitParams.setDefaults() + } + + return res.ID(), waitForContainerPresence(ctx, c, res.ID(), &prm.waitParams) +} + +func (c *clientWrapper) ContainerGet(ctx context.Context, prm PrmContainerGet) (*container.Container, error) { + var cliPrm sdkClient.PrmContainerGet + cliPrm.SetContainer(prm.cnrID) + + res, err := c.client.ContainerGet(ctx, cliPrm) + if err != nil { + return nil, err + } + return res.Container(), nil +} + +func (c *clientWrapper) ContainerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { + var cliPrm sdkClient.PrmContainerList + cliPrm.SetAccount(prm.ownerID) + + res, err := c.client.ContainerList(ctx, cliPrm) + if err != nil { + return nil, err + } + return res.Containers(), nil +} + +func (c *clientWrapper) ContainerDelete(ctx context.Context, prm PrmContainerDelete) error { + var cliPrm sdkClient.PrmContainerDelete + cliPrm.SetContainer(prm.cnrID) + cliPrm.SetSessionToken(prm.stoken) + + if _, err := c.client.ContainerDelete(ctx, cliPrm); err != nil { + return err + } + + if !prm.waitParamsSet { + prm.waitParams.setDefaults() + } + + return waitForContainerRemoved(ctx, c, &prm.cnrID, &prm.waitParams) +} + +func (c *clientWrapper) ContainerEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) { + var cliPrm sdkClient.PrmContainerEACL + cliPrm.SetContainer(prm.cnrID) + + res, err := c.client.ContainerEACL(ctx, cliPrm) + if err != nil { + return nil, err + } + return res.Table(), nil +} + +func (c *clientWrapper) ContainerSetEACL(ctx context.Context, prm PrmContainerSetEACL) error { + var cliPrm sdkClient.PrmContainerSetEACL + cliPrm.SetTable(prm.table) + + if _, err := c.client.ContainerSetEACL(ctx, cliPrm); err != nil { + return err + } + + if !prm.waitParamsSet { + prm.waitParams.setDefaults() + } + + return waitForEACLPresence(ctx, c, prm.table.CID(), &prm.table, &prm.waitParams) +} + +func (c *clientWrapper) EndpointInfo(ctx context.Context, _ PrmEndpointInfo) (*netmap.NodeInfo, error) { + res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) + if err != nil { + return nil, err + } + return res.NodeInfo(), nil +} + +func (c *clientWrapper) NetworkInfo(ctx context.Context, _ PrmNetworkInfo) (*netmap.NetworkInfo, error) { + res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) + if err != nil { + return nil, err + } + return res.Info(), nil +} + +func (c *clientWrapper) ObjectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { + var cliPrm sdkClient.PrmObjectPutInit + wObj, err := c.client.ObjectPutInit(ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("init writing on API client: %w", err) + } + + if prm.stoken != nil { + wObj.WithinSession(*prm.stoken) + } + wObj.UseKey(*prm.key) + + if prm.btoken != nil { + wObj.WithBearerToken(*prm.btoken) + } + + if wObj.WriteHeader(prm.hdr) { + sz := prm.hdr.PayloadSize() + + if data := prm.hdr.Payload(); len(data) > 0 { + if prm.payload != nil { + prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) + } else { + prm.payload = bytes.NewReader(data) + sz = uint64(len(data)) + } + } + + if prm.payload != nil { + const defaultBufferSizePut = 3 << 20 // configure? + + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } + + buf := make([]byte, sz) + + var n int + + for { + n, err = prm.payload.Read(buf) + if n > 0 { + if !wObj.WritePayloadChunk(buf[:n]) { + break + } + + continue + } + + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("read payload: %w", err) + } + } + } + + res, err := wObj.Close() + if err != nil { // here err already carries both status and client errors + return nil, fmt.Errorf("client failure: %w", err) + } + + var id oid.ID + + if !res.ReadStoredObjectID(&id) { + return nil, errors.New("missing ID of the stored object") + } + + return &id, nil +} + +func (c *clientWrapper) ObjectDelete(ctx context.Context, prm PrmObjectDelete) error { + var cliPrm sdkClient.PrmObjectDelete + + if cnr := prm.addr.ContainerID(); cnr != nil { + cliPrm.FromContainer(*cnr) + } + + if obj := prm.addr.ObjectID(); obj != nil { + cliPrm.ByID(*obj) + } + + if prm.stoken != nil { + cliPrm.WithinSession(*prm.stoken) + } + + if prm.btoken != nil { + cliPrm.WithBearerToken(*prm.btoken) + } + + cliPrm.UseKey(*prm.key) + _, err := c.client.ObjectDelete(ctx, cliPrm) + return err +} + +func (c *clientWrapper) ObjectGet(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { + var cliPrm sdkClient.PrmObjectGet + + if cnr := prm.addr.ContainerID(); cnr != nil { + cliPrm.FromContainer(*cnr) + } + + if obj := prm.addr.ObjectID(); obj != nil { + cliPrm.ByID(*obj) + } + + if cnr := prm.addr.ContainerID(); cnr != nil { + cliPrm.FromContainer(*cnr) + } + + if obj := prm.addr.ObjectID(); obj != nil { + cliPrm.ByID(*obj) + } + + if prm.stoken != nil { + cliPrm.WithinSession(*prm.stoken) + } + + if prm.btoken != nil { + cliPrm.WithBearerToken(*prm.btoken) + } + + var res ResGetObject + + rObj, err := c.client.ObjectGetInit(ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("init object reading on client: %w", err) + } + + rObj.UseKey(*prm.key) + + if !rObj.ReadHeader(&res.Header) { + _, err = rObj.Close() + return nil, fmt.Errorf("read header: %w", err) + } + + res.Payload = (*objectReadCloser)(rObj) + + return &res, nil +} + +func (c *clientWrapper) ObjectHead(ctx context.Context, prm PrmObjectHead) (*object.Object, error) { + var cliPrm sdkClient.PrmObjectHead + + if cnr := prm.addr.ContainerID(); cnr != nil { + cliPrm.FromContainer(*cnr) + } + + if obj := prm.addr.ObjectID(); obj != nil { + cliPrm.ByID(*obj) + } + + if prm.stoken != nil { + cliPrm.WithinSession(*prm.stoken) + } + + if prm.btoken != nil { + cliPrm.WithBearerToken(*prm.btoken) + } + + cliPrm.UseKey(*prm.key) + + var obj object.Object + + res, err := c.client.ObjectHead(ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("read object header via client: %w", err) + } + if !res.ReadHeader(&obj) { + return nil, errors.New("missing object header in response") + } + + return &obj, nil +} + +func (c *clientWrapper) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) { + var cliPrm sdkClient.PrmObjectRange + + cliPrm.SetOffset(prm.off) + cliPrm.SetLength(prm.ln) + + if cnr := prm.addr.ContainerID(); cnr != nil { + cliPrm.FromContainer(*cnr) + } + + if obj := prm.addr.ObjectID(); obj != nil { + cliPrm.ByID(*obj) + } + + if prm.stoken != nil { + cliPrm.WithinSession(*prm.stoken) + } + + if prm.btoken != nil { + cliPrm.WithBearerToken(*prm.btoken) + } + + res, err := c.client.ObjectRangeInit(ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("init payload range reading on client: %w", err) + } + res.UseKey(*prm.key) + + return &ResObjectRange{payload: res}, nil +} + +func (c *clientWrapper) ObjectSearch(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { + var cliPrm sdkClient.PrmObjectSearch + + cliPrm.InContainer(prm.cnrID) + cliPrm.SetFilters(prm.filters) + + if prm.stoken != nil { + cliPrm.WithinSession(*prm.stoken) + } + + if prm.btoken != nil { + cliPrm.WithBearerToken(*prm.btoken) + } + + res, err := c.client.ObjectSearchInit(ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("init object searching on client: %w", err) + } + res.UseKey(*prm.key) + + return &ResObjectSearch{r: res}, nil +} + +func (c *clientWrapper) SessionCreate(ctx context.Context, prm PrmCreateSession) (*ResCreateSession, error) { + var cliPrm sdkClient.PrmSessionCreate + cliPrm.SetExp(prm.exp) + + res, err := c.client.SessionCreate(ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("session creation on client: %w", err) + } + + return &ResCreateSession{ + id: res.ID(), + sessionKey: res.PublicKey(), + }, nil } // InitParameters contains values used to initialize connection Pool. @@ -183,28 +580,30 @@ type clientPack struct { address string } -type prmCommon struct { +type prmContext struct { defaultSession bool verb sessionv2.ObjectSessionVerb addr *address.Address - - key *ecdsa.PrivateKey - btoken *token.BearerToken - stoken *session.Token } -func (x *prmCommon) useDefaultSession() { +func (x *prmContext) useDefaultSession() { x.defaultSession = true } -func (x *prmCommon) useAddress(addr *address.Address) { +func (x *prmContext) useAddress(addr *address.Address) { x.addr = addr } -func (x *prmCommon) useVerb(verb sessionv2.ObjectSessionVerb) { +func (x *prmContext) useVerb(verb sessionv2.ObjectSessionVerb) { x.verb = verb } +type prmCommon struct { + key *ecdsa.PrivateKey + btoken *token.BearerToken + stoken *session.Token +} + // UseKey specifies private key to sign the requests. // If key is not provided, then Pool default key is used. func (x *prmCommon) UseKey(key *ecdsa.PrivateKey) { @@ -217,8 +616,8 @@ func (x *prmCommon) UseBearer(token *token.BearerToken) { } // UseSession specifies session within which operation should be performed. -func (x *prmCommon) UseSession(token *session.Token) { - x.stoken = token +func (x *prmCommon) UseSession(token session.Token) { + x.stoken = &token } // PrmObjectPut groups parameters of PutObject operation. @@ -429,6 +828,28 @@ func (x *PrmBalanceGet) SetOwnerID(ownerID owner.ID) { x.ownerID = ownerID } +type PrmCreateSession struct { + exp uint64 +} + +// SetExp sets number of the last NeoFS epoch in the lifetime of the session after which it will be expired. +func (x *PrmCreateSession) SetExp(exp uint64) { + x.exp = exp +} + +// PrmEndpointInfo groups parameters of EndpointInfo operation. +type PrmEndpointInfo struct{} + +// PrmNetworkInfo groups parameters of NetworkInfo operation. +type PrmNetworkInfo struct{} + +// ResCreateSession groups resulting values of SessionCreate operation. +type ResCreateSession struct { + id []byte + + sessionKey []byte +} + // Pool represents virtual connection to the NeoFS network to communicate // with multiple NeoFS servers without thinking about switching between servers // due to load balancing proportions or their unavailability. @@ -580,23 +1001,15 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { if params.clientBuilder == nil { params.clientBuilder = func(addr string) (client, error) { - var c sdkClient.Client - - var prmInit sdkClient.PrmInit - prmInit.ResolveNeoFSFailures() - prmInit.SetDefaultPrivateKey(*params.key) - prmInit.SetResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { + var prm wrapperPrm + prm.setAddress(addr) + prm.setKey(*params.key) + prm.setTimeout(params.nodeDialTimeout) + prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { cache.updateEpoch(info.Epoch()) return nil }) - - c.Init(prmInit) - - var prmDial sdkClient.PrmDial - prmDial.SetServerURI(addr) - prmDial.SetTimeout(params.nodeDialTimeout) - - return &c, c.Dial(prmDial) + return newWrapper(prm) } } } @@ -673,7 +1086,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights healthyChanged := false wg := sync.WaitGroup{} - var prmEndpoint sdkClient.PrmEndpointInfo + var prmEndpoint PrmEndpointInfo for j, cPack := range pool.clientPacks { wg.Add(1) @@ -787,12 +1200,12 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool { } func createSessionTokenForDuration(ctx context.Context, c client, ownerID *owner.ID, dur uint64) (*session.Token, error) { - ni, err := c.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) + ni, err := c.NetworkInfo(ctx, PrmNetworkInfo{}) if err != nil { return nil, err } - epoch := ni.Info().CurrentEpoch() + epoch := ni.CurrentEpoch() var exp uint64 if math.MaxUint64-epoch < dur { @@ -800,7 +1213,7 @@ func createSessionTokenForDuration(ctx context.Context, c client, ownerID *owner } else { exp = epoch + dur } - var prm sdkClient.PrmSessionCreate + var prm PrmCreateSession prm.SetExp(exp) res, err := c.SessionCreate(ctx, prm) @@ -829,7 +1242,7 @@ type callContext struct { sessionContext *session.ObjectContext } -func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon) error { +func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { cp, err := p.connection() if err != nil { return err @@ -849,11 +1262,11 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon) error { } // note that we don't override session provided by the caller - ctx.sessionDefault = cfg.stoken == nil && cfg.defaultSession + ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession if ctx.sessionDefault { ctx.sessionContext = session.NewObjectContext() - ctx.sessionContext.ToV2().SetVerb(cfg.verb) - ctx.sessionContext.ApplyTo(cfg.addr) + ctx.sessionContext.ToV2().SetVerb(prmCtx.verb) + ctx.sessionContext.ApplyTo(prmCtx.addr) } return err @@ -910,91 +1323,36 @@ func (p *Pool) call(ctx *callContext, f func() error) error { // PutObject writes an object through a remote server using NeoFS API protocol. func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { - prm.useDefaultSession() - prm.useVerb(sessionv2.ObjectVerbPut) - prm.useAddress(newAddressFromCnrID(prm.hdr.ContainerID())) + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(sessionv2.ObjectVerbPut) + prmCtx.useAddress(newAddressFromCnrID(prm.hdr.ContainerID())) var ctxCall callContext ctxCall.Context = ctx - if err := p.initCallContext(&ctxCall, prm.prmCommon); err != nil { + if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { return nil, fmt.Errorf("init call context") } - var cliPrm sdkClient.PrmObjectPutInit + if ctxCall.sessionDefault { + ctxCall.sessionTarget = prm.UseSession + err = p.openDefaultSession(&ctxCall) + if err != nil { + return nil, fmt.Errorf("open default session: %w", err) + } + } + prm.key = ctxCall.key - wObj, err := ctxCall.client.ObjectPutInit(ctx, cliPrm) + id, err := ctxCall.client.ObjectPut(ctx, prm) if err != nil { + // removes session token from cache in case of token error + p.checkSessionTokenErr(err, ctxCall.endpoint) return nil, fmt.Errorf("init writing on API client: %w", err) } - ctxCall.sessionTarget = wObj.WithinSession - - var resID oid.ID - - err = p.call(&ctxCall, func() error { - wObj.UseKey(*ctxCall.key) - - if prm.btoken != nil { - wObj.WithBearerToken(*prm.btoken) - } - - if wObj.WriteHeader(prm.hdr) { - sz := prm.hdr.PayloadSize() - - if data := prm.hdr.Payload(); len(data) > 0 { - if prm.payload != nil { - prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) - } else { - prm.payload = bytes.NewReader(data) - sz = uint64(len(data)) - } - } - - if prm.payload != nil { - const defaultBufferSizePut = 3 << 20 // configure? - - if sz == 0 || sz > defaultBufferSizePut { - sz = defaultBufferSizePut - } - - buf := make([]byte, sz) - - var n int - - for { - n, err = prm.payload.Read(buf) - if n > 0 { - if !wObj.WritePayloadChunk(buf[:n]) { - break - } - - continue - } - - if errors.Is(err, io.EOF) { - break - } - - return fmt.Errorf("read payload: %w", err) - } - } - } - - res, err := wObj.Close() - if err != nil { - return err - } - - if !res.ReadStoredObjectID(&resID) { - return errors.New("missing ID of the stored object") - } - - return nil - }) - - return &resID, err + return id, nil } // DeleteObject marks an object for deletion from the container using NeoFS API protocol. @@ -1002,39 +1360,23 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) // It confirms the user's intent to delete the object, and is itself a container object. // Explicit deletion is done asynchronously, and is generally not guaranteed. func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { - prm.useDefaultSession() - prm.useVerb(sessionv2.ObjectVerbDelete) - prm.useAddress(&prm.addr) - - var cliPrm sdkClient.PrmObjectDelete + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(sessionv2.ObjectVerbDelete) + prmCtx.useAddress(&prm.addr) var cc callContext cc.Context = ctx - cc.sessionTarget = cliPrm.WithinSession + cc.sessionTarget = prm.UseSession - err := p.initCallContext(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon, prmCtx) if err != nil { return err } - if cnr := prm.addr.ContainerID(); cnr != nil { - cliPrm.FromContainer(*cnr) - } - - if obj := prm.addr.ObjectID(); obj != nil { - cliPrm.ByID(*obj) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - cliPrm.UseKey(*cc.key) - return p.call(&cc, func() error { - _, err = cc.client.ObjectDelete(ctx, cliPrm) - if err != nil { + if err = cc.client.ObjectDelete(ctx, prm); err != nil { return fmt.Errorf("remove object via client: %w", err) } @@ -1064,111 +1406,49 @@ type ResGetObject struct { // GetObject reads object header and initiates reading an object payload through a remote server using NeoFS API protocol. func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { - prm.useDefaultSession() - prm.useVerb(sessionv2.ObjectVerbGet) - prm.useAddress(&prm.addr) - - var cliPrm sdkClient.PrmObjectGet + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(sessionv2.ObjectVerbGet) + prmCtx.useAddress(&prm.addr) var cc callContext - cc.Context = ctx - cc.sessionTarget = cliPrm.WithinSession + cc.sessionTarget = prm.UseSession - err := p.initCallContext(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon, prmCtx) if err != nil { return nil, err } - if cnr := prm.addr.ContainerID(); cnr != nil { - cliPrm.FromContainer(*cnr) - } - - if obj := prm.addr.ObjectID(); obj != nil { - cliPrm.ByID(*obj) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - var res ResGetObject - - err = p.call(&cc, func() error { - rObj, err := cc.client.ObjectGetInit(ctx, cliPrm) - if err != nil { - return fmt.Errorf("init object reading on client: %w", err) - } - - rObj.UseKey(*cc.key) - - if !rObj.ReadHeader(&res.Header) { - _, err = rObj.Close() - return fmt.Errorf("read header: %w", err) - } - - res.Payload = (*objectReadCloser)(rObj) - - return nil + var res *ResGetObject + return res, p.call(&cc, func() error { + res, err = cc.client.ObjectGet(ctx, prm) + return err }) - if err != nil { - return nil, err - } - - return &res, nil } // HeadObject reads object header through a remote server using NeoFS API protocol. func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) { - prm.useDefaultSession() - prm.useVerb(sessionv2.ObjectVerbHead) - prm.useAddress(&prm.addr) - - var cliPrm sdkClient.PrmObjectHead + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(sessionv2.ObjectVerbHead) + prmCtx.useAddress(&prm.addr) var cc callContext cc.Context = ctx - cc.sessionTarget = cliPrm.WithinSession + cc.sessionTarget = prm.UseSession - err := p.initCallContext(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon, prmCtx) if err != nil { return nil, err } - if cnr := prm.addr.ContainerID(); cnr != nil { - cliPrm.FromContainer(*cnr) - } - - if obj := prm.addr.ObjectID(); obj != nil { - cliPrm.ByID(*obj) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - cliPrm.UseKey(*cc.key) - - var obj object.Object - - err = p.call(&cc, func() error { - res, err := cc.client.ObjectHead(ctx, cliPrm) - if err != nil { - return fmt.Errorf("read object header via client: %w", err) - } - - if !res.ReadHeader(&obj) { - return errors.New("missing object header in response") - } - - return nil + var obj *object.Object + return obj, p.call(&cc, func() error { + obj, err = cc.client.ObjectHead(ctx, prm) + return err }) - if err != nil { - return nil, err - } - - return &obj, nil } // ResObjectRange is designed to read payload range of one object @@ -1195,56 +1475,26 @@ func (x *ResObjectRange) Close() error { // ObjectRange initiates reading an object's payload range through a remote // server using NeoFS API protocol. func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) { - prm.useDefaultSession() - prm.useVerb(sessionv2.ObjectVerbRange) - prm.useAddress(&prm.addr) - - var cliPrm sdkClient.PrmObjectRange - - cliPrm.SetOffset(prm.off) - cliPrm.SetLength(prm.ln) + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(sessionv2.ObjectVerbRange) + prmCtx.useAddress(&prm.addr) var cc callContext - cc.Context = ctx - cc.sessionTarget = cliPrm.WithinSession + cc.sessionTarget = prm.UseSession - err := p.initCallContext(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon, prmCtx) if err != nil { return nil, err } - if cnr := prm.addr.ContainerID(); cnr != nil { - cliPrm.FromContainer(*cnr) - } + var res *ResObjectRange - if obj := prm.addr.ObjectID(); obj != nil { - cliPrm.ByID(*obj) - } - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } - - var res ResObjectRange - - err = p.call(&cc, func() error { - var err error - - res.payload, err = cc.client.ObjectRangeInit(ctx, cliPrm) - if err != nil { - return fmt.Errorf("init payload range reading on client: %w", err) - } - - res.payload.UseKey(*cc.key) - - return nil + return res, p.call(&cc, func() error { + res, err = cc.client.ObjectRange(ctx, prm) + return err }) - if err != nil { - return nil, err - } - - return &res, nil } // ResObjectSearch is designed to read list of object identifiers from NeoFS system. @@ -1289,48 +1539,27 @@ func (x *ResObjectSearch) Close() { // is done using the ResObjectSearch. Exactly one return value is non-nil. // Resulting reader must be finally closed. func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { - prm.useDefaultSession() - prm.useVerb(sessionv2.ObjectVerbSearch) - prm.useAddress(newAddressFromCnrID(&prm.cnrID)) - - var cliPrm sdkClient.PrmObjectSearch - - cliPrm.InContainer(prm.cnrID) - cliPrm.SetFilters(prm.filters) - - if prm.btoken != nil { - cliPrm.WithBearerToken(*prm.btoken) - } + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(sessionv2.ObjectVerbSearch) + prmCtx.useAddress(newAddressFromCnrID(&prm.cnrID)) var cc callContext cc.Context = ctx - cc.sessionTarget = cliPrm.WithinSession + cc.sessionTarget = prm.UseSession - err := p.initCallContext(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon, prmCtx) if err != nil { return nil, err } - var res ResObjectSearch + var res *ResObjectSearch - err = p.call(&cc, func() error { - var err error - - res.r, err = cc.client.ObjectSearchInit(ctx, cliPrm) - if err != nil { - return fmt.Errorf("init object searching on client: %w", err) - } - - res.r.UseKey(*cc.key) - - return nil + return res, p.call(&cc, func() error { + res, err = cc.client.ObjectSearch(ctx, prm) + return err }) - if err != nil { - return nil, err - } - - return &res, nil } // PutContainer sends request to save container in NeoFS and waits for the operation to complete. @@ -1346,19 +1575,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, return nil, err } - var cliPrm sdkClient.PrmContainerPut - cliPrm.SetContainer(prm.cnr) - - res, err := cp.client.ContainerPut(ctx, cliPrm) - if err != nil { // here err already carries both status and client errors - return nil, err - } - - if !prm.waitParamsSet { - prm.waitParams.setDefaults() - } - - return res.ID(), waitForContainerPresence(ctx, p, res.ID(), &prm.waitParams) + return cp.client.ContainerPut(ctx, prm) } // GetContainer reads NeoFS container by ID. @@ -1368,15 +1585,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe return nil, err } - var cliPrm sdkClient.PrmContainerGet - cliPrm.SetContainer(prm.cnrID) - - res, err := cp.client.ContainerGet(ctx, cliPrm) - if err != nil { // here err already carries both status and client errors - return nil, err - } - - return res.Container(), nil + return cp.client.ContainerGet(ctx, prm) } // ListContainers requests identifiers of the account-owned containers. @@ -1386,15 +1595,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid. return nil, err } - var cliPrm sdkClient.PrmContainerList - cliPrm.SetAccount(prm.ownerID) - - res, err := cp.client.ContainerList(ctx, cliPrm) - if err != nil { // here err already carries both status and client errors - return nil, err - } - - return res.Containers(), nil + return cp.client.ContainerList(ctx, prm) } // DeleteContainer sends request to remove the NeoFS container and waits for the operation to complete. @@ -1410,23 +1611,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro return err } - var cliPrm sdkClient.PrmContainerDelete - cliPrm.SetContainer(prm.cnrID) - cliPrm.SetSessionToken(prm.stoken) - - _, err = cp.client.ContainerDelete(ctx, cliPrm) - - // here err already carries both status and client errors - - if err != nil { - return err - } - - if !prm.waitParamsSet { - prm.waitParams.setDefaults() - } - - return waitForContainerRemoved(ctx, p, &prm.cnrID, &prm.waitParams) + return cp.client.ContainerDelete(ctx, prm) } // GetEACL reads eACL table of the NeoFS container. @@ -1436,15 +1621,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, return nil, err } - var cliPrm sdkClient.PrmContainerEACL - cliPrm.SetContainer(prm.cnrID) - - res, err := cp.client.ContainerEACL(ctx, cliPrm) - if err != nil { // here err already carries both status and client errors - return nil, err - } - - return res.Table(), nil + return cp.client.ContainerEACL(ctx, prm) } // SetEACL sends request to update eACL table of the NeoFS container and waits for the operation to complete. @@ -1460,22 +1637,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { return err } - var cliPrm sdkClient.PrmContainerSetEACL - cliPrm.SetTable(prm.table) - - _, err = cp.client.ContainerSetEACL(ctx, cliPrm) - - // here err already carries both status and client errors - - if err != nil { - return err - } - - if !prm.waitParamsSet { - prm.waitParams.setDefaults() - } - - return waitForEACLPresence(ctx, p, prm.table.CID(), &prm.table, &prm.waitParams) + return cp.client.ContainerSetEACL(ctx, prm) } // Balance requests current balance of the NeoFS account. @@ -1485,39 +1647,31 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci return nil, err } - var cliPrm sdkClient.PrmBalanceGet - cliPrm.SetAccount(prm.ownerID) - - res, err := cp.client.BalanceGet(ctx, cliPrm) - if err != nil { // here err already carries both status and client errors - return nil, err - } - - return res.Amount(), nil + return cp.client.BalanceGet(ctx, prm) } // waitForContainerPresence waits until the container is found on the NeoFS network. -func waitForContainerPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error { +func waitForContainerPresence(ctx context.Context, cli client, cnrID *cid.ID, waitParams *WaitParams) error { var prm PrmContainerGet if cnrID != nil { prm.SetContainerID(*cnrID) } return waitFor(ctx, waitParams, func(ctx context.Context) bool { - _, err := pool.GetContainer(ctx, prm) + _, err := cli.ContainerGet(ctx, prm) return err == nil }) } // waitForEACLPresence waits until the container eacl is applied on the NeoFS network. -func waitForEACLPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, table *eacl.Table, waitParams *WaitParams) error { +func waitForEACLPresence(ctx context.Context, cli client, cnrID *cid.ID, table *eacl.Table, waitParams *WaitParams) error { var prm PrmContainerEACL if cnrID != nil { prm.SetContainerID(*cnrID) } return waitFor(ctx, waitParams, func(ctx context.Context) bool { - eaclTable, err := pool.GetEACL(ctx, prm) + eaclTable, err := cli.ContainerEACL(ctx, prm) if err == nil { return eacl.EqualTables(*table, *eaclTable) } @@ -1526,14 +1680,14 @@ func waitForEACLPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, table * } // waitForContainerRemoved waits until the container is removed from the NeoFS network. -func waitForContainerRemoved(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error { +func waitForContainerRemoved(ctx context.Context, cli client, cnrID *cid.ID, waitParams *WaitParams) error { var prm PrmContainerGet if cnrID != nil { prm.SetContainerID(*cnrID) } return waitFor(ctx, waitParams, func(ctx context.Context) bool { - _, err := pool.GetContainer(ctx, prm) + _, err := cli.ContainerGet(ctx, prm) return sdkClient.IsErrContainerNotFound(err) }) } @@ -1568,12 +1722,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) { return nil, err } - res, err := cp.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) - if err != nil { // here err already carries both status and client errors - return nil, err - } - - return res.Info(), nil + return cp.client.NetworkInfo(ctx, PrmNetworkInfo{}) } // Close closes the Pool and releases all the associated resources. @@ -1583,11 +1732,11 @@ func (p *Pool) Close() { } // creates new session token with specified owner from SessionCreate call result. -func sessionTokenForOwner(id *owner.ID, cliRes *sdkClient.ResSessionCreate, exp uint64) *session.Token { +func sessionTokenForOwner(id *owner.ID, cliRes *ResCreateSession, exp uint64) *session.Token { st := session.NewToken() st.SetOwnerID(id) - st.SetID(cliRes.ID()) - st.SetSessionKey(cliRes.PublicKey()) + st.SetID(cliRes.id) + st.SetSessionKey(cliRes.sessionKey) st.SetExp(exp) return st