package pipe_test

import (
	"context"
	"io/ioutil"
	"os"
	"path/filepath"
	"runtime"
	"sync"
	"testing"
	"time"

	"github.com/restic/restic/internal/debug"
	"github.com/restic/restic/internal/pipe"
	rtest "github.com/restic/restic/internal/test"
)

type stats struct {
	dirs, files int
}

func acceptAll(string, os.FileInfo) bool {
	return true
}

func statPath(path string) (stats, error) {
	var s stats

	// count files and directories with filepath.Walk()
	err := filepath.Walk(rtest.TestWalkerPath, func(p string, fi os.FileInfo, err error) error {
		if fi == nil {
			return err
		}

		if fi.IsDir() {
			s.dirs++
		} else {
			s.files++
		}

		return err
	})

	return s, err
}

const maxWorkers = 100

func TestPipelineWalkerWithSplit(t *testing.T) {
	if rtest.TestWalkerPath == "" {
		t.Skipf("walkerpath not set, skipping TestPipelineWalker")
	}

	var err error
	if !filepath.IsAbs(rtest.TestWalkerPath) {
		rtest.TestWalkerPath, err = filepath.Abs(rtest.TestWalkerPath)
		rtest.OK(t, err)
	}

	before, err := statPath(rtest.TestWalkerPath)
	rtest.OK(t, err)

	t.Logf("walking path %s with %d dirs, %d files", rtest.TestWalkerPath,
		before.dirs, before.files)

	// account for top level dir
	before.dirs++

	after := stats{}
	m := sync.Mutex{}

	worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
		defer wg.Done()
		for {
			select {
			case e, ok := <-entCh:
				if !ok {
					// channel is closed
					return
				}

				m.Lock()
				after.files++
				m.Unlock()

				e.Result() <- true

			case dir, ok := <-dirCh:
				if !ok {
					// channel is closed
					return
				}

				// wait for all content
				for _, ch := range dir.Entries {
					<-ch
				}

				m.Lock()
				after.dirs++
				m.Unlock()

				dir.Result() <- true
			case <-done:
				// pipeline was cancelled
				return
			}
		}
	}

	var wg sync.WaitGroup
	done := make(chan struct{})
	entCh := make(chan pipe.Entry)
	dirCh := make(chan pipe.Dir)

	for i := 0; i < maxWorkers; i++ {
		wg.Add(1)
		go worker(&wg, done, entCh, dirCh)
	}

	jobs := make(chan pipe.Job, 200)
	wg.Add(1)
	go func() {
		pipe.Split(jobs, dirCh, entCh)
		close(entCh)
		close(dirCh)
		wg.Done()
	}()

	resCh := make(chan pipe.Result, 1)
	pipe.Walk(context.TODO(), []string{rtest.TestWalkerPath}, acceptAll, jobs, resCh)

	// wait for all workers to terminate
	wg.Wait()

	// wait for top-level blob
	<-resCh

	t.Logf("walked path %s with %d dirs, %d files", rtest.TestWalkerPath,
		after.dirs, after.files)

	rtest.Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}

func TestPipelineWalker(t *testing.T) {
	if rtest.TestWalkerPath == "" {
		t.Skipf("walkerpath not set, skipping TestPipelineWalker")
	}

	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	var err error
	if !filepath.IsAbs(rtest.TestWalkerPath) {
		rtest.TestWalkerPath, err = filepath.Abs(rtest.TestWalkerPath)
		rtest.OK(t, err)
	}

	before, err := statPath(rtest.TestWalkerPath)
	rtest.OK(t, err)

	t.Logf("walking path %s with %d dirs, %d files", rtest.TestWalkerPath,
		before.dirs, before.files)

	// account for top level dir
	before.dirs++

	after := stats{}
	m := sync.Mutex{}

	worker := func(ctx context.Context, wg *sync.WaitGroup, jobs <-chan pipe.Job) {
		defer wg.Done()
		for {
			select {
			case job, ok := <-jobs:
				if !ok {
					// channel is closed
					return
				}
				rtest.Assert(t, job != nil, "job is nil")

				switch j := job.(type) {
				case pipe.Dir:
					// wait for all content
					for _, ch := range j.Entries {
						<-ch
					}

					m.Lock()
					after.dirs++
					m.Unlock()

					j.Result() <- true
				case pipe.Entry:
					m.Lock()
					after.files++
					m.Unlock()

					j.Result() <- true
				}

			case <-ctx.Done():
				// pipeline was cancelled
				return
			}
		}
	}

	var wg sync.WaitGroup
	jobs := make(chan pipe.Job)

	for i := 0; i < maxWorkers; i++ {
		wg.Add(1)
		go worker(ctx, &wg, jobs)
	}

	resCh := make(chan pipe.Result, 1)
	pipe.Walk(ctx, []string{rtest.TestWalkerPath}, acceptAll, jobs, resCh)

	// wait for all workers to terminate
	wg.Wait()

	// wait for top-level blob
	<-resCh

	t.Logf("walked path %s with %d dirs, %d files", rtest.TestWalkerPath,
		after.dirs, after.files)

	rtest.Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}

func createFile(filename, data string) error {
	f, err := os.Create(filename)
	if err != nil {
		return err
	}

	defer f.Close()

	_, err = f.Write([]byte(data))
	if err != nil {
		return err
	}

	return nil
}

func TestPipeWalkerError(t *testing.T) {
	dir, err := ioutil.TempDir("", "restic-test-")
	rtest.OK(t, err)

	base := filepath.Base(dir)

	var testjobs = []struct {
		path []string
		err  bool
	}{
		{[]string{base, "a", "file_a"}, false},
		{[]string{base, "a"}, false},
		{[]string{base, "b"}, true},
		{[]string{base, "c", "file_c"}, false},
		{[]string{base, "c"}, false},
		{[]string{base}, false},
		{[]string{}, false},
	}

	rtest.OK(t, os.Mkdir(filepath.Join(dir, "a"), 0755))
	rtest.OK(t, os.Mkdir(filepath.Join(dir, "b"), 0755))
	rtest.OK(t, os.Mkdir(filepath.Join(dir, "c"), 0755))

	rtest.OK(t, createFile(filepath.Join(dir, "a", "file_a"), "file a"))
	rtest.OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b"))
	rtest.OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c"))

	ranHook := false
	testdir := filepath.Join(dir, "b")

	// install hook that removes the dir right before readdirnames()
	debug.Hook("pipe.readdirnames", func(context interface{}) {
		path := context.(string)

		if path != testdir {
			return
		}

		t.Logf("in hook, removing test file %v", testdir)
		ranHook = true

		rtest.OK(t, os.RemoveAll(testdir))
	})

	ctx, cancel := context.WithCancel(context.TODO())

	ch := make(chan pipe.Job)
	resCh := make(chan pipe.Result, 1)

	go pipe.Walk(ctx, []string{dir}, acceptAll, ch, resCh)

	i := 0
	for job := range ch {
		if i == len(testjobs) {
			t.Errorf("too many jobs received")
			break
		}

		p := filepath.Join(testjobs[i].path...)
		if p != job.Path() {
			t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path())
		}

		if testjobs[i].err {
			if job.Error() == nil {
				t.Errorf("job %d expected error but got nil", i)
			}
		} else {
			if job.Error() != nil {
				t.Errorf("job %d expected no error but got %v", i, job.Error())
			}
		}

		i++
	}

	if i != len(testjobs) {
		t.Errorf("expected %d jobs, got %d", len(testjobs), i)
	}

	cancel()

	rtest.Assert(t, ranHook, "hook did not run")
	rtest.OK(t, os.RemoveAll(dir))
}

func BenchmarkPipelineWalker(b *testing.B) {
	if rtest.TestWalkerPath == "" {
		b.Skipf("walkerpath not set, skipping BenchPipelineWalker")
	}

	var max time.Duration
	m := sync.Mutex{}

	fileWorker := func(ctx context.Context, wg *sync.WaitGroup, ch <-chan pipe.Entry) {
		defer wg.Done()
		for {
			select {
			case e, ok := <-ch:
				if !ok {
					// channel is closed
					return
				}

				// simulate backup
				//time.Sleep(10 * time.Millisecond)

				e.Result() <- true
			case <-ctx.Done():
				// pipeline was cancelled
				return
			}
		}
	}

	dirWorker := func(ctx context.Context, wg *sync.WaitGroup, ch <-chan pipe.Dir) {
		defer wg.Done()
		for {
			select {
			case dir, ok := <-ch:
				if !ok {
					// channel is closed
					return
				}

				start := time.Now()

				// wait for all content
				for _, ch := range dir.Entries {
					<-ch
				}

				d := time.Since(start)
				m.Lock()
				if d > max {
					max = d
				}
				m.Unlock()

				dir.Result() <- true
			case <-ctx.Done():
				// pipeline was cancelled
				return
			}
		}
	}

	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	for i := 0; i < b.N; i++ {
		max = 0
		entCh := make(chan pipe.Entry, 200)
		dirCh := make(chan pipe.Dir, 200)

		var wg sync.WaitGroup
		b.Logf("starting %d workers", maxWorkers)
		for i := 0; i < maxWorkers; i++ {
			wg.Add(2)
			go dirWorker(ctx, &wg, dirCh)
			go fileWorker(ctx, &wg, entCh)
		}

		jobs := make(chan pipe.Job, 200)
		wg.Add(1)
		go func() {
			pipe.Split(jobs, dirCh, entCh)
			close(entCh)
			close(dirCh)
			wg.Done()
		}()

		resCh := make(chan pipe.Result, 1)
		pipe.Walk(ctx, []string{rtest.TestWalkerPath}, acceptAll, jobs, resCh)

		// wait for all workers to terminate
		wg.Wait()

		// wait for final result
		<-resCh

		b.Logf("max duration for a dir: %v", max)
	}
}

func TestPipelineWalkerMultiple(t *testing.T) {
	if rtest.TestWalkerPath == "" {
		t.Skipf("walkerpath not set, skipping TestPipelineWalker")
	}

	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	paths, err := filepath.Glob(filepath.Join(rtest.TestWalkerPath, "*"))
	rtest.OK(t, err)

	before, err := statPath(rtest.TestWalkerPath)
	rtest.OK(t, err)

	t.Logf("walking paths %v with %d dirs, %d files", paths,
		before.dirs, before.files)

	after := stats{}
	m := sync.Mutex{}

	worker := func(ctx context.Context, wg *sync.WaitGroup, jobs <-chan pipe.Job) {
		defer wg.Done()
		for {
			select {
			case job, ok := <-jobs:
				if !ok {
					// channel is closed
					return
				}
				rtest.Assert(t, job != nil, "job is nil")

				switch j := job.(type) {
				case pipe.Dir:
					// wait for all content
					for _, ch := range j.Entries {
						<-ch
					}

					m.Lock()
					after.dirs++
					m.Unlock()

					j.Result() <- true
				case pipe.Entry:
					m.Lock()
					after.files++
					m.Unlock()

					j.Result() <- true
				}

			case <-ctx.Done():
				// pipeline was cancelled
				return
			}
		}
	}

	var wg sync.WaitGroup
	jobs := make(chan pipe.Job)

	for i := 0; i < maxWorkers; i++ {
		wg.Add(1)
		go worker(ctx, &wg, jobs)
	}

	resCh := make(chan pipe.Result, 1)
	pipe.Walk(ctx, paths, acceptAll, jobs, resCh)

	// wait for all workers to terminate
	wg.Wait()

	// wait for top-level blob
	<-resCh

	t.Logf("walked %d paths with %d dirs, %d files", len(paths), after.dirs, after.files)

	rtest.Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}

func dirsInPath(path string) int {
	if path == "/" || path == "." || path == "" {
		return 0
	}

	n := 0
	for dir := path; dir != "/" && dir != "."; dir = filepath.Dir(dir) {
		n++
	}

	return n
}

func TestPipeWalkerRoot(t *testing.T) {
	if runtime.GOOS == "windows" {
		t.Skipf("not running TestPipeWalkerRoot on %s", runtime.GOOS)
		return
	}

	cwd, err := os.Getwd()
	rtest.OK(t, err)

	testPaths := []string{
		string(filepath.Separator),
		".",
		cwd,
	}

	for _, path := range testPaths {
		testPipeWalkerRootWithPath(path, t)
	}
}

func testPipeWalkerRootWithPath(path string, t *testing.T) {
	pattern := filepath.Join(path, "*")
	rootPaths, err := filepath.Glob(pattern)
	rtest.OK(t, err)

	for i, p := range rootPaths {
		rootPaths[i], err = filepath.Rel(path, p)
		rtest.OK(t, err)
	}

	t.Logf("paths in %v (pattern %q) expanded to %v items", path, pattern, len(rootPaths))

	jobCh := make(chan pipe.Job)
	var jobs []pipe.Job

	worker := func(wg *sync.WaitGroup) {
		defer wg.Done()
		for job := range jobCh {
			jobs = append(jobs, job)
		}
	}

	var wg sync.WaitGroup
	wg.Add(1)
	go worker(&wg)

	filter := func(p string, fi os.FileInfo) bool {
		p, err := filepath.Rel(path, p)
		rtest.OK(t, err)
		return dirsInPath(p) <= 1
	}

	resCh := make(chan pipe.Result, 1)
	pipe.Walk(context.TODO(), []string{path}, filter, jobCh, resCh)

	wg.Wait()

	t.Logf("received %d jobs", len(jobs))

	for i, job := range jobs[:len(jobs)-1] {
		path := job.Path()
		if path == "." || path == ".." || path == string(filepath.Separator) {
			t.Errorf("job %v has invalid path %q", i, path)
		}
	}

	lastPath := jobs[len(jobs)-1].Path()
	if lastPath != "" {
		t.Errorf("last job has non-empty path %q", lastPath)
	}

	if len(jobs) < len(rootPaths) {
		t.Errorf("want at least %v jobs, got %v for path %v\n", len(rootPaths), len(jobs), path)
	}
}