Ekaterina Lebedeva
ec0fc52e56
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 3m40s
Tests and linters / Staticcheck (pull_request) Successful in 4m14s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m50s
DCO action / DCO (pull_request) Successful in 5m41s
Tests and linters / Run gofumpt (pull_request) Successful in 5m39s
Build / Build Components (pull_request) Successful in 6m4s
Tests and linters / Tests (pull_request) Successful in 6m30s
Tests and linters / gopls check (pull_request) Successful in 8m5s
Tests and linters / Tests with -race (pull_request) Successful in 8m20s
Tests and linters / Lint (pull_request) Successful in 8m22s
* Added new method for listing containers to container service. It opens stream and sends containers in batches. * Added TransportSplitter wrapper around ExecutionService to split container ID list read from contract in parts that are smaller than grpc max message size. Batch size can be changed in node configuration file (as in example config file). * Changed `container list` implementaion in cli: now ListStream is called by default. Old List is called only if ListStream is not implemented. * Changed `internalclient.ListContainersPrm`.`Account` to `OwnerID` since `client.PrmContainerList`.`Account` was renamed to `OwnerID` in sdk. Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
104 lines
3 KiB
Go
104 lines
3 KiB
Go
package container
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
|
)
|
|
|
|
type ServiceExecutor interface {
|
|
Put(context.Context, *session.Token, *container.PutRequestBody) (*container.PutResponseBody, error)
|
|
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
|
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
|
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
|
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
|
|
}
|
|
|
|
type executorSvc struct {
|
|
Server
|
|
|
|
exec ServiceExecutor
|
|
|
|
respSvc *response.Service
|
|
}
|
|
|
|
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
|
|
func NewExecutionService(exec ServiceExecutor, respSvc *response.Service) Server {
|
|
return &executorSvc{
|
|
exec: exec,
|
|
respSvc: respSvc,
|
|
}
|
|
}
|
|
|
|
func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
|
meta := req.GetMetaHeader()
|
|
for origin := meta.GetOrigin(); origin != nil; origin = meta.GetOrigin() {
|
|
meta = origin
|
|
}
|
|
|
|
respBody, err := s.exec.Put(ctx, meta.GetSessionToken(), req.GetBody())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not execute Put request: %w", err)
|
|
}
|
|
|
|
resp := new(container.PutResponse)
|
|
resp.SetBody(respBody)
|
|
|
|
s.respSvc.SetMeta(resp)
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
|
meta := req.GetMetaHeader()
|
|
for origin := meta.GetOrigin(); origin != nil; origin = meta.GetOrigin() {
|
|
meta = origin
|
|
}
|
|
|
|
respBody, err := s.exec.Delete(ctx, meta.GetSessionToken(), req.GetBody())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not execute Delete request: %w", err)
|
|
}
|
|
|
|
resp := new(container.DeleteResponse)
|
|
resp.SetBody(respBody)
|
|
|
|
s.respSvc.SetMeta(resp)
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
|
respBody, err := s.exec.Get(ctx, req.GetBody())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not execute Get request: %w", err)
|
|
}
|
|
|
|
resp := new(container.GetResponse)
|
|
resp.SetBody(respBody)
|
|
|
|
s.respSvc.SetMeta(resp)
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
|
respBody, err := s.exec.List(ctx, req.GetBody())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not execute List request: %w", err)
|
|
}
|
|
|
|
resp := new(container.ListResponse)
|
|
resp.SetBody(respBody)
|
|
|
|
s.respSvc.SetMeta(resp)
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
|
err := s.exec.ListStream(stream.Context(), req, stream)
|
|
if err != nil {
|
|
return fmt.Errorf("could not execute ListStream request: %w", err)
|
|
}
|
|
return nil
|
|
}
|