[#165] Fix lint errors
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
da642a498a
commit
4e15452853
8 changed files with 13 additions and 140 deletions
|
@ -81,7 +81,7 @@ func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
|
||||||
return v.ETag
|
return v.ETag
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsFilledExtra returns true is node was created by version of gate v0.29.x and later
|
// IsFilledExtra returns true is node was created by version of gate v0.29.x and later.
|
||||||
func (v BaseNodeVersion) IsFilledExtra() bool {
|
func (v BaseNodeVersion) IsFilledExtra() bool {
|
||||||
return v.Created != nil && v.Owner != nil
|
return v.Created != nil && v.Owner != nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -434,7 +434,7 @@ func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.B
|
||||||
return bktInfo, objInfo
|
return bktInfo, objInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func createVersionedBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
func createVersionedBucketAndObject(_ *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
||||||
bktInfo := createVersionedBucket(tc, bktName)
|
bktInfo := createVersionedBucket(tc, bktName)
|
||||||
objInfo := createTestObject(tc, bktInfo, objName, encryption.Params{})
|
objInfo := createTestObject(tc, bktInfo, objName, encryption.Params{})
|
||||||
|
|
||||||
|
|
|
@ -6,12 +6,10 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -563,61 +561,6 @@ func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, nam
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareObjects(hc *handlerContext, bktInfo *data.BucketInfo, prefix string, size int) []string {
|
|
||||||
treeID := "version"
|
|
||||||
parentID := uint64(0)
|
|
||||||
if prefix != "" {
|
|
||||||
for _, filename := range strings.Split(prefix, "/") {
|
|
||||||
nodeID, err := hc.treeMock.AddNode(hc.Context(), bktInfo, treeID, parentID, map[string]string{
|
|
||||||
"FileName": filename,
|
|
||||||
})
|
|
||||||
require.NoError(hc.t, err)
|
|
||||||
parentID = nodeID
|
|
||||||
}
|
|
||||||
prefix += "/"
|
|
||||||
}
|
|
||||||
|
|
||||||
objects := make([]string, size)
|
|
||||||
|
|
||||||
for i := range objects {
|
|
||||||
filename := "object" + strconv.Itoa(i)
|
|
||||||
filepath := prefix + filename
|
|
||||||
|
|
||||||
prm := layer.PrmObjectCreate{
|
|
||||||
Container: bktInfo.CID,
|
|
||||||
Filepath: filepath,
|
|
||||||
Payload: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
id, err := hc.tp.CreateObject(hc.Context(), prm)
|
|
||||||
require.NoError(hc.t, err)
|
|
||||||
|
|
||||||
newVersion := &data.NodeVersion{
|
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
|
||||||
OID: id,
|
|
||||||
ETag: "12345678",
|
|
||||||
FilePath: filepath,
|
|
||||||
},
|
|
||||||
IsUnversioned: true,
|
|
||||||
IsCombined: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = hc.treeMock.AddNodeBase(hc.Context(), bktInfo, treeID, parentID, map[string]string{
|
|
||||||
"OID": newVersion.OID.EncodeToString(),
|
|
||||||
"FileName": filename,
|
|
||||||
"IsUnversioned": "true",
|
|
||||||
}, false)
|
|
||||||
require.NoError(hc.t, err)
|
|
||||||
objects[i] = filepath
|
|
||||||
}
|
|
||||||
|
|
||||||
hc.treeMock.Sort()
|
|
||||||
|
|
||||||
sort.Strings(objects)
|
|
||||||
|
|
||||||
return objects
|
|
||||||
}
|
|
||||||
|
|
||||||
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
|
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
|
||||||
return listObjectsV2Ext(hc, bktName, prefix, delimiter, startAfter, continuationToken, "", maxKeys)
|
return listObjectsV2Ext(hc, bktName, prefix, delimiter, startAfter, continuationToken, "", maxKeys)
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,7 +199,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
|
||||||
}
|
}
|
||||||
|
|
||||||
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
|
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
|
||||||
objOutCh, err := n.initWorkerPoolStream(ctx, 2, p.commonVersionsListingParams, generator)
|
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
|
||||||
}
|
}
|
||||||
|
|
||||||
generator, errorCh := nodesGeneratorVersions(ctx, p, session)
|
generator, errorCh := nodesGeneratorVersions(ctx, p, session)
|
||||||
objOutCh, err := n.initWorkerPoolVersions(ctx, 2, p, generator)
|
objOutCh, err := n.initWorkerPool(ctx, 2, p, generator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
@ -312,7 +312,6 @@ func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsList
|
||||||
session := n.cache.GetListSession(owner, cacheKey)
|
session := n.cache.GetListSession(owner, cacheKey)
|
||||||
if session == nil {
|
if session == nil {
|
||||||
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
|
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if session.Acquired.Swap(true) {
|
if session.Acquired.Swap(true) {
|
||||||
|
@ -495,7 +494,7 @@ func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams,
|
||||||
return nodeCh, errCh
|
return nodeCh, errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
func (n *layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
||||||
reqLog := n.reqLogger(ctx)
|
reqLog := n.reqLogger(ctx)
|
||||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -503,68 +502,6 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVers
|
||||||
}
|
}
|
||||||
objCh := make(chan *data.ExtendedNodeVersion, size)
|
objCh := make(chan *data.ExtendedNodeVersion, size)
|
||||||
|
|
||||||
go func() {
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
LOOP:
|
|
||||||
for node := range input {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
break LOOP
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if node.DirName != "" || node.NodeVersion.IsFilledExtra() {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case objCh <- node:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// We have to make a copy of pointer to data.NodeVersion
|
|
||||||
// to get correct value in submitted task function.
|
|
||||||
func(node *data.ExtendedNodeVersion) {
|
|
||||||
wg.Add(1)
|
|
||||||
err = pool.Submit(func() {
|
|
||||||
defer wg.Done()
|
|
||||||
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion)
|
|
||||||
if oi == nil {
|
|
||||||
// try to get object again
|
|
||||||
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil {
|
|
||||||
// do not process object which are definitely missing in object service
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
node.NodeVersion.FillExtra(oi)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case objCh <- node:
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
wg.Done()
|
|
||||||
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
|
||||||
}
|
|
||||||
}(node)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
close(objCh)
|
|
||||||
pool.Release()
|
|
||||||
}()
|
|
||||||
|
|
||||||
return objCh, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
|
||||||
reqLog := n.reqLogger(ctx)
|
|
||||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
|
|
||||||
}
|
|
||||||
objCh := make(chan *data.ExtendedNodeVersion)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
|
|
@ -151,11 +151,9 @@ func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
|
||||||
s.ln, err = s.r.Read(s.buffer)
|
s.ln, err = s.r.Read(s.buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
err = handleError(err)
|
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", handleError(err))
|
||||||
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", err)
|
|
||||||
} else {
|
|
||||||
s.eof = true
|
|
||||||
}
|
}
|
||||||
|
s.eof = true
|
||||||
}
|
}
|
||||||
if s.ln > 0 {
|
if s.ln > 0 {
|
||||||
s.index = 0
|
s.index = 0
|
||||||
|
|
|
@ -138,4 +138,5 @@ const (
|
||||||
ControlAPIGetPolicy = "get policy request"
|
ControlAPIGetPolicy = "get policy request"
|
||||||
ControlAPIListPolicies = "list policies request"
|
ControlAPIListPolicies = "list policies request"
|
||||||
PolicyValidationFailed = "policy validation failed"
|
PolicyValidationFailed = "policy validation failed"
|
||||||
|
ParseTreeNode = "parse tree node"
|
||||||
)
|
)
|
||||||
|
|
|
@ -675,7 +675,7 @@ func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, e
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
for true {
|
for {
|
||||||
if s.innerStream == nil {
|
if s.innerStream == nil {
|
||||||
node, err := s.getNodeFromMainStream()
|
node, err := s.getNodeFromMainStream()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -707,12 +707,10 @@ func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, e
|
||||||
}
|
}
|
||||||
return nodeVersion, nil
|
return nodeVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
panic("unreachable code")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
|
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
|
||||||
for true {
|
for {
|
||||||
node, err := s.mainStream.Next()
|
node, err := s.mainStream.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrNodeNotFound) {
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
@ -725,8 +723,6 @@ func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, erro
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
panic("unreachable code")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
|
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
|
||||||
|
@ -747,7 +743,7 @@ func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
|
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
|
||||||
for true {
|
for {
|
||||||
node, err := s.innerStream.Next()
|
node, err := s.innerStream.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("inner stream: %w", err)
|
return nil, fmt.Errorf("inner stream: %w", err)
|
||||||
|
@ -782,14 +778,12 @@ func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.Node
|
||||||
|
|
||||||
return nodeVersion, nil
|
return nodeVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
panic("unreachable code")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
|
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
|
||||||
trNode, fileName, err := parseTreeNode(node)
|
trNode, fileName, err := parseTreeNode(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug("parse tree node", zap.Error(err))
|
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -336,7 +336,7 @@ func (c *ServiceClientMemory) AddNodeBase(_ context.Context, bktInfo *data.Bucke
|
||||||
|
|
||||||
parentNode.children = append(parentNode.children, tn)
|
parentNode.children = append(parentNode.children, tn)
|
||||||
if needSort {
|
if needSort {
|
||||||
//sortNodes(parentNode.children)
|
sortNodes(parentNode.children)
|
||||||
}
|
}
|
||||||
cnr.trees[treeID] = tr
|
cnr.trees[treeID] = tr
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue