From da642a498af0b3e07371fb480d736529f5e007dc Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 22 Jan 2024 10:01:24 +0300 Subject: [PATCH] [#165] Listing fix data race Signed-off-by: Denis Kirillov --- api/data/listsession.go | 2 +- api/data/tree.go | 9 ++ api/handler/object_list.go | 17 ++-- api/layer/listing.go | 158 +++++++++++++++++------------------ api/layer/versioning_test.go | 4 +- 5 files changed, 99 insertions(+), 91 deletions(-) diff --git a/api/data/listsession.go b/api/data/listsession.go index 1a15ab9c..a13f2e4c 100644 --- a/api/data/listsession.go +++ b/api/data/listsession.go @@ -10,7 +10,7 @@ type VersionsStream interface { } type ListSession struct { - Next []*NodeVersion + Next []*ExtendedNodeVersion Stream VersionsStream NamesMap map[string]struct{} Context context.Context diff --git a/api/data/tree.go b/api/data/tree.go index 002cfa4e..efc600d5 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -24,6 +24,7 @@ type NodeVersion struct { type ExtendedNodeVersion struct { NodeVersion *NodeVersion IsLatest bool + DirName string } func (e ExtendedNodeVersion) Version() string { @@ -34,6 +35,14 @@ func (e ExtendedNodeVersion) Version() string { return e.NodeVersion.OID.EncodeToString() } +func (e ExtendedNodeVersion) Name() string { + if e.DirName != "" { + return e.DirName + } + + return e.NodeVersion.FilePath +} + // ExtendedObjectInfo contains additional node info to be able to sort versions by timestamp. type ExtendedObjectInfo struct { ObjectInfo *ObjectInfo diff --git a/api/handler/object_list.go b/api/handler/object_list.go index 191e3cb2..a3865690 100644 --- a/api/handler/object_list.go +++ b/api/handler/object_list.go @@ -185,18 +185,18 @@ func fillPrefixes(src []string, encode string) []CommonPrefix { return dst } -func fillContentsWithOwner(src []*data.NodeVersion, encode string, md5Enabled bool) []Object { +func fillContentsWithOwner(src []*data.ExtendedNodeVersion, encode string, md5Enabled bool) []Object { return fillContents(src, encode, true, md5Enabled) } -func fillContents(src []*data.NodeVersion, encode string, fetchOwner, md5Enabled bool) []Object { +func fillContents(src []*data.ExtendedNodeVersion, encode string, fetchOwner, md5Enabled bool) []Object { var dst []Object for _, obj := range src { res := Object{ - Key: s3PathEncode(obj.FilePath, encode), - Size: obj.Size, - LastModified: obj.Created.UTC().Format(time.RFC3339), - ETag: data.Quote(obj.GetETag(md5Enabled)), + Key: s3PathEncode(obj.NodeVersion.FilePath, encode), + Size: obj.NodeVersion.Size, + LastModified: obj.NodeVersion.Created.UTC().Format(time.RFC3339), + ETag: data.Quote(obj.NodeVersion.GetETag(md5Enabled)), StorageClass: api.DefaultStorageClass, } @@ -205,9 +205,10 @@ func fillContents(src []*data.NodeVersion, encode string, fetchOwner, md5Enabled //} if fetchOwner { + owner := obj.NodeVersion.Owner.String() res.Owner = &Owner{ - ID: obj.Owner.String(), - DisplayName: obj.Owner.String(), + ID: owner, + DisplayName: owner, } } diff --git a/api/layer/listing.go b/api/layer/listing.go index 6099008c..6e7984f2 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -45,7 +45,7 @@ type ( // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. ListObjectsInfo struct { Prefixes []string - Objects []*data.NodeVersion + Objects []*data.ExtendedNodeVersion IsTruncated bool } @@ -118,10 +118,10 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis if next != nil { result.IsTruncated = true - result.NextMarker = objects[len(objects)-1].FilePath + result.NextMarker = objects[len(objects)-1].Name() } - result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter) + result.Prefixes, result.Objects = triageExtendedObjects(objects) return &result, nil } @@ -149,10 +149,10 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis if next != nil { result.IsTruncated = true - result.NextContinuationToken = next.OID.EncodeToString() + result.NextContinuationToken = next.NodeVersion.OID.EncodeToString() } - result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter) + result.Prefixes, result.Objects = triageExtendedObjects(objects) return &result, nil } @@ -183,12 +183,12 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString() } - res.CommonPrefixes, objects = triageExtendedObjects(objects, p.Prefix, p.Delimiter) + res.CommonPrefixes, objects = triageExtendedObjects(objects) res.Version, res.DeleteMarker = triageVersions(objects) return res, nil } -func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) { +func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) { if p.MaxKeys == 0 { return nil, nil, nil } @@ -204,7 +204,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } - objects = make([]*data.NodeVersion, 0, p.MaxKeys+1) + objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1) objects = append(objects, session.Next...) for obj := range objOutCh { objects = append(objects, obj) @@ -261,17 +261,15 @@ func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p common allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) for eoi := range objOutCh { name := eoi.NodeVersion.FilePath - - dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter) - if dirName != "" { - name = dirName + if eoi.DirName != "" { + name = eoi.DirName } if lastName != name { formVersionsListRow(allObjects, listRowStartIndex, session) listRowStartIndex = len(allObjects) allObjects = append(allObjects, eoi) - } else if dirName == "" { + } else if eoi.DirName == "" { allObjects = append(allObjects, eoi) } lastName = name @@ -292,7 +290,11 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first }) - objects[rowStartIndex].IsLatest = len(session.Next) == 0 || session.Next[0].FilePath != objects[rowStartIndex].NodeVersion.FilePath + prevVersions[0].IsLatest = len(session.Next) == 0 || session.Next[0].NodeVersion.FilePath != prevVersions[0].NodeVersion.FilePath + + for _, version := range prevVersions[1:] { + version.IsLatest = false + } } func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { @@ -340,7 +342,7 @@ func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVers return session, nil } -func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.NodeVersion) { +func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { if len(allObjects) <= p.MaxKeys { return } @@ -348,16 +350,16 @@ func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatest var cacheKey cache.ListSessionKey switch p.ListType { case ListObjectsV1Type: - cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].FilePath) + cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].Name()) case ListObjectsV2Type: - cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].OID.EncodeToString()) + cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].NodeVersion.OID.EncodeToString()) default: // should never happen panic("invalid list type") } session.Acquired.Store(false) - session.Next = []*data.NodeVersion{allObjects[p.MaxKeys]} + session.Next = []*data.ExtendedNodeVersion{allObjects[p.MaxKeys]} n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) } @@ -368,20 +370,19 @@ func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsL session.Acquired.Store(false) - session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1) - session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion + session.Next = make([]*data.ExtendedNodeVersion, len(allObjects)-p.MaxKeys+1) + session.Next[0] = allObjects[p.MaxKeys-1] for i, node := range allObjects[p.MaxKeys:] { - session.Next[i+1] = node.NodeVersion + session.Next[i+1] = node } - cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()) + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].NodeVersion.OID.EncodeToString()) n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) } -func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { - nodeCh := make(chan *data.NodeVersion, 1000) +func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) { + nodeCh := make(chan *data.ExtendedNodeVersion, 1000) errCh := make(chan error, 1) - //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories existed := stream.NamesMap if len(stream.Next) != 0 { @@ -407,14 +408,20 @@ func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, st break LOOP } - if shouldSkip(node, p, existed) { + nodeExt := &data.ExtendedNodeVersion{ + NodeVersion: node, + IsLatest: true, + DirName: tryDirectoryName(node, p.Prefix, p.Delimiter), + } + + if shouldSkip(nodeExt, p, existed) { continue } select { case <-ctx.Done(): break LOOP - case nodeCh <- node: + case nodeCh <- nodeExt: generated++ if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken @@ -429,8 +436,8 @@ func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, st return nodeCh, errCh } -func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { - nodeCh := make(chan *data.NodeVersion, 1000) +func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) { + nodeCh := make(chan *data.ExtendedNodeVersion, 1000) errCh := make(chan error, 1) existed := stream.NamesMap @@ -442,13 +449,14 @@ func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, ind int err error lastName string + node *data.NodeVersion + nodeExt *data.ExtendedNodeVersion ) LOOP: for err == nil { - var node *data.NodeVersion if ind < len(stream.Next) { - node = stream.Next[ind] + nodeExt = stream.Next[ind] ind++ } else { node, err = stream.Stream.Next(ctx) @@ -458,21 +466,26 @@ func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, } break LOOP } + + nodeExt = &data.ExtendedNodeVersion{ + NodeVersion: node, + DirName: tryDirectoryName(node, p.Prefix, p.Delimiter), + } } - if shouldSkipVersions(node, p, existed) { + if shouldSkipVersions(nodeExt, p, existed) { continue } select { case <-ctx.Done(): break LOOP - case nodeCh <- node: + case nodeCh <- nodeExt: generated++ - if generated > p.MaxKeys && node.FilePath != lastName { + if generated > p.MaxKeys && nodeExt.NodeVersion.FilePath != lastName { break LOOP } - lastName = node.FilePath + lastName = nodeExt.NodeVersion.FilePath } } close(nodeCh) @@ -482,13 +495,13 @@ func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, return nodeCh, errCh } -func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { +func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } - objCh := make(chan *data.NodeVersion, size) + objCh := make(chan *data.ExtendedNodeVersion, size) go func() { var wg sync.WaitGroup @@ -501,10 +514,7 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVers default: } - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); dirName != "" || node.IsFilledExtra() { // todo think to not compute twice - if dirName != "" { - node.FilePath = dirName - } + if node.DirName != "" || node.NodeVersion.IsFilledExtra() { select { case <-ctx.Done(): case objCh <- node: @@ -512,20 +522,20 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVers } else { // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. - func(node *data.NodeVersion) { + func(node *data.ExtendedNodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() - oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node) + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion) if oi == nil { // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil { + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil { // do not process object which are definitely missing in object service return } } - node.FillExtra(oi) + node.NodeVersion.FillExtra(oi) select { case <-ctx.Done(): @@ -547,7 +557,7 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVers return objCh, nil } -func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) { +func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { @@ -566,33 +576,33 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVe default: } - if node.IsFilledExtra() { + if node.DirName != "" || node.NodeVersion.IsFilledExtra() { select { case <-ctx.Done(): - case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}: + case objCh <- node: } } else { // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. - func(node *data.NodeVersion) { + func(node *data.ExtendedNodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() - oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node) + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion) if oi == nil { // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil { + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil { // do not process object which are definitely missing in object service return } } - node.FillExtra(oi) + node.NodeVersion.FillExtra(oi) select { case <-ctx.Done(): - case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}: + case objCh <- node: } }) if err != nil { @@ -610,15 +620,16 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVe return objCh, nil } -func shouldSkip(node *data.NodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { - if node.IsDeleteMarker { +func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { + if node.NodeVersion.IsDeleteMarker { return true } - filePath := node.FilePath - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { - filePath = dirName + filePath := node.NodeVersion.FilePath + if node.DirName != "" { + filePath = node.DirName } + if _, ok := existed[filePath]; ok { return true } @@ -629,7 +640,7 @@ func shouldSkip(node *data.NodeVersion, p commonVersionsListingParams, existed m if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { - if p.Bookmark != node.OID.EncodeToString() { + if p.Bookmark != node.NodeVersion.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} @@ -640,10 +651,10 @@ func shouldSkip(node *data.NodeVersion, p commonVersionsListingParams, existed m return false } -func shouldSkipVersions(node *data.NodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { - filePath := node.FilePath - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { - filePath = dirName +func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { + filePath := node.NodeVersion.FilePath + if node.DirName != "" { + filePath = node.DirName if _, ok := existed[filePath]; ok { return true } @@ -655,7 +666,7 @@ func shouldSkipVersions(node *data.NodeVersion, p commonVersionsListingParams, e if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { - if p.Bookmark != node.OID.EncodeToString() { + if p.Bookmark != node.NodeVersion.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} @@ -667,23 +678,10 @@ func shouldSkipVersions(node *data.NodeVersion, p commonVersionsListingParams, e return false } -func triageObjects(allObjects []*data.NodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.NodeVersion) { - objects = make([]*data.NodeVersion, 0, len(allObjects)) +func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion) (prefixes []string, objects []*data.ExtendedNodeVersion) { for _, ov := range allObjects { - if dirName := tryDirectoryName(ov, prefix, delimiter); dirName != "" { - prefixes = append(prefixes, dirName) - } else { - objects = append(objects, ov) - } - } - - return -} - -func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.ExtendedNodeVersion) { - for _, ov := range allObjects { - if dirName := tryDirectoryName(ov.NodeVersion, prefix, delimiter); dirName != "" { - prefixes = append(prefixes, dirName) + if ov.DirName != "" { + prefixes = append(prefixes, ov.DirName) } else { objects = append(objects, ov) } diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index deb65a41..2bb25447 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -72,7 +72,7 @@ func (tc *testContext) deleteObject(objectName, versionID string, settings *data } } -func (tc *testContext) listObjectsV1() []*data.NodeVersion { +func (tc *testContext) listObjectsV1() []*data.ExtendedNodeVersion { res, err := tc.layer.ListObjectsV1(tc.ctx, &ListObjectsParamsV1{ ListObjectsParamsCommon: ListObjectsParamsCommon{ BktInfo: tc.bktInfo, @@ -83,7 +83,7 @@ func (tc *testContext) listObjectsV1() []*data.NodeVersion { return res.Objects } -func (tc *testContext) listObjectsV2() []*data.NodeVersion { +func (tc *testContext) listObjectsV2() []*data.ExtendedNodeVersion { res, err := tc.layer.ListObjectsV2(tc.ctx, &ListObjectsParamsV2{ ListObjectsParamsCommon: ListObjectsParamsCommon{ BktInfo: tc.bktInfo,