package searchsvc

import (
	"context"

	"github.com/nspcc-dev/neofs-api-go/pkg/container"
	objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
	"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
	"github.com/pkg/errors"
)

type localStream struct {
	query query.Query

	storage *localstore.Storage

	cid *container.ID
}

type searchQueryFilter struct {
	localstore.FilterPipeline

	query query.Query

	ch chan<- []*objectSDK.ID

	cid *container.ID
}

func (s *localStream) stream(ctx context.Context, ch chan<- []*objectSDK.ID) error {
	filter := &searchQueryFilter{
		query: s.query,
		ch:    ch,
		cid:   s.cid,
	}

	if err := s.storage.Iterate(filter, func(meta *localstore.ObjectMeta) bool {
		select {
		case <-ctx.Done():
			return true
		default:
			return false
		}
	}); err != nil && !errors.Is(errors.Cause(err), bucket.ErrIteratingAborted) {
		return errors.Wrapf(err, "(%T) could not iterate over local storage", s)
	}

	return nil
}

func (f *searchQueryFilter) Pass(ctx context.Context, meta *localstore.ObjectMeta) *localstore.FilterResult {
	if obj := meta.Head(); f.cid.Equal(obj.GetContainerID()) {
		f.query.Match(meta.Head(), func(id *objectSDK.ID) {
			select {
			case <-ctx.Done():
				return
			case f.ch <- []*objectSDK.ID{id}:
			}
		})
	}

	return localstore.ResultPass()
}