* NewStore ctx argument for logger extraction

* Add to WaitGroup on limiter `acquire()`
Signed-off-by: Andrey Voronkov <voronkovaa@gmail.com>
This commit is contained in:
Andrey Voronkov 2023-03-20 15:07:54 +03:00
parent 0ffef9d8f2
commit a555c9cbbe
2 changed files with 6 additions and 5 deletions

View file

@ -203,7 +203,7 @@ func (repo *repository) Named() reference.Named {
} }
func (repo *repository) Tags(ctx context.Context) distribution.TagService { func (repo *repository) Tags(ctx context.Context) distribution.TagService {
tags := NewStore(repo, repo.registry.blobStore) tags := NewStore(ctx, repo, repo.registry.blobStore)
return tags return tags
} }

View file

@ -9,9 +9,9 @@ import (
"sync" "sync"
"github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3"
dcontext "github.com/distribution/distribution/v3/context"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
) )
var _ distribution.TagService = &TagStore{} var _ distribution.TagService = &TagStore{}
@ -27,11 +27,12 @@ type TagStore struct {
lookupConcurrencyFactor int lookupConcurrencyFactor int
} }
func NewStore(repository *repository, blobStore *blobStore) *TagStore { func NewStore(ctx context.Context, repository *repository, blobStore *blobStore) *TagStore {
logger := dcontext.GetLogger(ctx)
lookupConcurrencyFactor, err := strconv.Atoi(os.Getenv("STORAGE_TAGSTORE_LOOKUP_CONCURRENCY")) lookupConcurrencyFactor, err := strconv.Atoi(os.Getenv("STORAGE_TAGSTORE_LOOKUP_CONCURRENCY"))
if err != nil { if err != nil {
lookupConcurrencyFactor = 64 lookupConcurrencyFactor = 64
logrus.Infof("TagStore: STORAGE_TAGSTORE_LOOKUP_CONCURRENCY is not set. Using default %d as lookup concurrency factor", lookupConcurrencyFactor) logger.Infof("TagStore: STORAGE_TAGSTORE_LOOKUP_CONCURRENCY is not set. Using default %d as lookup concurrency factor", lookupConcurrencyFactor)
} }
return &TagStore{ return &TagStore{
repository: repository, repository: repository,
@ -171,11 +172,11 @@ func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
outputChan := make(chan string, allTagsCount) outputChan := make(chan string, allTagsCount)
waitGroup := sync.WaitGroup{} waitGroup := sync.WaitGroup{}
waitGroup.Add(allTagsCount)
limiter := make(chan struct{}, ts.lookupConcurrencyFactor) limiter := make(chan struct{}, ts.lookupConcurrencyFactor)
acquire := func() { acquire := func() {
limiter <- struct{}{} limiter <- struct{}{}
waitGroup.Add(1)
} }
release := func() { release := func() {
<-limiter <-limiter