package notifications

import (
	"reflect"
	"sync"
	"testing"
	"time"

	events "github.com/docker/go-events"

	"github.com/sirupsen/logrus"
)

func TestEventQueue(t *testing.T) {
	const nevents = 1000
	var ts testSink
	metrics := newSafeMetrics("")
	eq := newEventQueue(
		// delayed sync simulates destination slower than channel comms
		&delayedSink{
			Sink:  &ts,
			delay: time.Millisecond * 1,
		}, metrics.eventQueueListener())

	var wg sync.WaitGroup
	var event events.Event
	for i := 1; i <= nevents; i++ {
		event = createTestEvent("push", "library/test", "blob")
		wg.Add(1)
		go func(event events.Event) {
			if err := eq.Write(event); err != nil {
				t.Errorf("error writing event block: %v", err)
			}
			wg.Done()
		}(event)
	}

	wg.Wait()
	if t.Failed() {
		t.FailNow()
	}
	checkClose(t, eq)

	ts.mu.Lock()
	defer ts.mu.Unlock()
	metrics.Lock()
	defer metrics.Unlock()

	if ts.count != nevents {
		t.Fatalf("events did not make it to the sink: %d != %d", ts.count, 1000)
	}

	if !ts.closed {
		t.Fatalf("sink should have been closed")
	}

	if metrics.Events != nevents {
		t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
	}

	if metrics.Pending != 0 {
		t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
	}
}

func TestIgnoredSink(t *testing.T) {
	blob := createTestEvent("push", "library/test", "blob")
	manifest := createTestEvent("pull", "library/test", "manifest")

	type testcase struct {
		ignoreMediaTypes []string
		ignoreActions    []string
		expected         events.Event
	}

	tests := []testcase{
		{expected: blob},
		{ignoreMediaTypes: []string{"other"}, ignoreActions: []string{"other"}, expected: blob},
		{ignoreMediaTypes: []string{"blob", "manifest"}, ignoreActions: []string{"other"}},
		{ignoreMediaTypes: []string{"other"}, ignoreActions: []string{"pull"}, expected: blob},
		{ignoreMediaTypes: []string{"other"}, ignoreActions: []string{"pull", "push"}},
	}

	for _, tc := range tests {
		ts := &testSink{}
		s := newIgnoredSink(ts, tc.ignoreMediaTypes, tc.ignoreActions)

		if err := s.Write(blob); err != nil {
			t.Fatalf("error writing event: %v", err)
		}

		ts.mu.Lock()
		if !reflect.DeepEqual(ts.event, tc.expected) {
			t.Fatalf("unexpected event: %#v != %#v", ts.event, tc.expected)
		}
		ts.mu.Unlock()
	}

	tests = []testcase{
		{expected: manifest},
		{ignoreMediaTypes: []string{"other"}, ignoreActions: []string{"other"}, expected: manifest},
		{ignoreMediaTypes: []string{"blob"}, ignoreActions: []string{"other"}, expected: manifest},
		{ignoreMediaTypes: []string{"blob", "manifest"}, ignoreActions: []string{"other"}},
		{ignoreMediaTypes: []string{"other"}, ignoreActions: []string{"push"}, expected: manifest},
		{ignoreMediaTypes: []string{"other"}, ignoreActions: []string{"pull", "push"}},
	}

	for _, tc := range tests {
		ts := &testSink{}
		s := newIgnoredSink(ts, tc.ignoreMediaTypes, tc.ignoreActions)

		if err := s.Write(manifest); err != nil {
			t.Fatalf("error writing event: %v", err)
		}

		ts.mu.Lock()
		if !reflect.DeepEqual(ts.event, tc.expected) {
			t.Fatalf("unexpected event: %#v != %#v", ts.event, tc.expected)
		}
		ts.mu.Unlock()
	}
}

type testSink struct {
	event  events.Event
	count  int
	mu     sync.Mutex
	closed bool
}

func (ts *testSink) Write(event events.Event) error {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.event = event
	ts.count++
	return nil
}

func (ts *testSink) Close() error {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.closed = true

	logrus.Infof("closing testSink")
	return nil
}

type delayedSink struct {
	events.Sink
	delay time.Duration
}

func (ds *delayedSink) Write(event events.Event) error {
	time.Sleep(ds.delay)
	return ds.Sink.Write(event)
}

func checkClose(t *testing.T, sink events.Sink) {
	if err := sink.Close(); err != nil {
		t.Fatalf("unexpected error closing: %v", err)
	}

	// second close should not crash but should return an error.
	if err := sink.Close(); err == nil {
		t.Fatalf("no error on double close")
	}

	// Write after closed should be an error
	if err := sink.Write(Event{}); err == nil {
		t.Fatalf("write after closed did not have an error")
	} else if err != ErrSinkClosed {
		t.Fatalf("error should be ErrSinkClosed")
	}
}