[#165] Generalize allObjectListingParams

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
pull/294/head
Denis Kirillov 2024-01-21 00:30:25 +03:00
parent 093de13f54
commit 0ae49eaab0
1 changed files with 51 additions and 50 deletions

View File

@ -73,13 +73,13 @@ type (
VersionIDMarker string
}
allObjectParams struct {
Bucket *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
ContinuationToken string
allObjectListingParams struct {
BktInfo *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
Bookmark string
}
)
@ -87,12 +87,13 @@ type (
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := allObjectParams{
Bucket: p.BktInfo,
prm := allObjectListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
Bookmark: p.Marker,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
@ -114,13 +115,13 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
ContinuationToken: p.ContinuationToken,
prm := allObjectListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
Bookmark: p.ContinuationToken,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
@ -139,7 +140,16 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
objects, isTruncated, err := n.getAllObjectsVersions(ctx, p)
prm := allObjectListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.KeyMarker,
Bookmark: p.VersionIDMarker,
}
objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
@ -160,13 +170,13 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
return res, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) {
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) // todo redo for listv1
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) // todo redo for listv1
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
@ -180,7 +190,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix)
session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, nil, err
}
@ -213,13 +223,13 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
if next != nil {
session.Next = []*data.NodeVersion{next}
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.OID.EncodeToString()), session)
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, next.OID.EncodeToString()), session)
}
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, bool, error) {
func (n *layer) getAllObjectsVersions(ctx context.Context, p allObjectListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
if p.MaxKeys == 0 {
return nil, false, nil
}
@ -229,17 +239,8 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions
return nil, false, err
}
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(ctx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(ctx, 2, pp, generator)
generator, errorCh := nodesGeneratorVersions(ctx, p, session)
objOutCh, err := n.initWorkerPoolVersions(ctx, 2, p, generator)
if err != nil {
return nil, false, err
}
@ -298,10 +299,10 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions
return allObjects, isTruncated, nil
}
func (n *layer) getListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (*data.ListSession, error) {
func (n *layer) getListVersionsSession(ctx context.Context, p allObjectListingParams) (*data.ListSession, error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey)
if session == nil {
return n.initNewListVersionsSession(ctx, p)
@ -319,7 +320,7 @@ func (n *layer) getListVersionsSession(ctx context.Context, p *ListObjectVersion
return session, nil
}
func (n *layer) initNewListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (session *data.ListSession, err error) {
func (n *layer) initNewListVersionsSession(ctx context.Context, p allObjectListingParams) (session *data.ListSession, err error) {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
@ -335,7 +336,7 @@ func (n *layer) initNewListVersionsSession(ctx context.Context, p *ListObjectVer
return session, nil
}
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
func nodesGeneratorStream(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
@ -386,7 +387,7 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L
return nodeCh, errCh
}
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
func nodesGeneratorVersions(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
existed := stream.NamesMap
@ -439,7 +440,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data
return nodeCh, errCh
}
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) {
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
@ -473,10 +474,10 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectP
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node)
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil {
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil {
// do not process object which are definitely missing in object service
return
}
@ -504,7 +505,7 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectP
return objCh, nil
}
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
@ -536,10 +537,10 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node)
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil {
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil {
// do not process object which are definitely missing in object service
return
}
@ -567,7 +568,7 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec
return objCh, nil
}
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
func shouldSkip(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool {
if node.IsDeleteMarker {
return true
}
@ -584,9 +585,9 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st
return true
}
if p.ContinuationToken != "" {
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
if p.Bookmark != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
@ -597,7 +598,7 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st
return false
}
func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
func shouldSkipVersions(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool {
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
@ -610,9 +611,9 @@ func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[s
return true
}
if p.ContinuationToken != "" {
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
if p.Bookmark != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}