From 18eac89506705b84b3d4871daa36bf02664b5e6d Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 20 Nov 2014 14:50:51 -0800 Subject: [PATCH] Adds a test for concurrent storagedriver Write/Read Stream operations This test is currently failing and Skipped for IPC drivers --- storagedriver/ipc/client.go | 4 +- storagedriver/testsuites/testsuites.go | 140 ++++++++++++++++++------- 2 files changed, 104 insertions(+), 40 deletions(-) diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index a0ff3788f..08f7b8008 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -422,10 +422,10 @@ func (driver *StorageDriverClient) handleSubprocessExit() { // stopped func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error { receiveChan := make(chan error, 1) - go func(receiveChan chan<- error) { + go func(receiver libchan.Receiver, receiveChan chan<- error) { defer close(receiveChan) receiveChan <- receiver.Receive(response) - }(receiveChan) + }(receiver, receiveChan) var err error var ok bool diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 4c86b05a5..617566678 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -4,6 +4,7 @@ import ( "bytes" "io/ioutil" "math/rand" + "os" "path" "sort" "testing" @@ -17,7 +18,8 @@ import ( // Test hooks up gocheck into the "go test" runner. func Test(t *testing.T) { check.TestingT(t) } -// RegisterInProcessSuite registers an in-process storage driver test suite with the go test runner +// RegisterInProcessSuite registers an in-process storage driver test suite with +// the go test runner. func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { check.Suite(&DriverSuite{ Constructor: driverConstructor, @@ -25,8 +27,8 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC }) } -// RegisterIPCSuite registers a storage driver test suite which runs the named driver as a child -// process with the given parameters +// RegisterIPCSuite registers a storage driver test suite which runs the named +// driver as a child process with the given parameters. func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { suite := &DriverSuite{ Constructor: func() (storagedriver.StorageDriver, error) { @@ -53,21 +55,26 @@ func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck check.Suite(suite) } -// SkipCheck is a function used to determine if a test suite should be skipped -// If a SkipCheck returns a non-empty skip reason, the suite is skipped with the given reason +// SkipCheck is a function used to determine if a test suite should be skipped. +// If a SkipCheck returns a non-empty skip reason, the suite is skipped with +// the given reason. type SkipCheck func() (reason string) -// NeverSkip is a default SkipCheck which never skips the suite +// NeverSkip is a default SkipCheck which never skips the suite. var NeverSkip SkipCheck = func() string { return "" } -// DriverConstructor is a function which returns a new storagedriver.StorageDriver +// DriverConstructor is a function which returns a new +// storagedriver.StorageDriver. type DriverConstructor func() (storagedriver.StorageDriver, error) -// DriverTeardown is a function which cleans up a suite's storagedriver.StorageDriver +// DriverTeardown is a function which cleans up a suite's +// storagedriver.StorageDriver. type DriverTeardown func() error -// DriverSuite is a gocheck test suite designed to test a storagedriver.StorageDriver -// The intended way to create a DriverSuite is with RegisterInProcessSuite or RegisterIPCSuite +// DriverSuite is a gocheck test suite designed to test a +// storagedriver.StorageDriver. +// The intended way to create a DriverSuite is with RegisterInProcessSuite or +// RegisterIPCSuite. type DriverSuite struct { Constructor DriverConstructor Teardown DriverTeardown @@ -75,7 +82,7 @@ type DriverSuite struct { storagedriver.StorageDriver } -// SetUpSuite sets up the gocheck test suite +// SetUpSuite sets up the gocheck test suite. func (suite *DriverSuite) SetUpSuite(c *check.C) { if reason := suite.SkipCheck(); reason != "" { c.Skip(reason) @@ -85,7 +92,7 @@ func (suite *DriverSuite) SetUpSuite(c *check.C) { suite.StorageDriver = d } -// TearDownSuite tears down the gocheck test suite +// TearDownSuite tears down the gocheck test suite. func (suite *DriverSuite) TearDownSuite(c *check.C) { if suite.Teardown != nil { err := suite.Teardown() @@ -93,35 +100,35 @@ func (suite *DriverSuite) TearDownSuite(c *check.C) { } } -// TestWriteRead1 tests a simple write-read workflow +// TestWriteRead1 tests a simple write-read workflow. func (suite *DriverSuite) TestWriteRead1(c *check.C) { filename := randomString(32) contents := []byte("a") - suite.writeReadCompare(c, filename, contents, contents) + suite.writeReadCompare(c, filename, contents) } -// TestWriteRead2 tests a simple write-read workflow with unicode data +// TestWriteRead2 tests a simple write-read workflow with unicode data. func (suite *DriverSuite) TestWriteRead2(c *check.C) { filename := randomString(32) contents := []byte("\xc3\x9f") - suite.writeReadCompare(c, filename, contents, contents) + suite.writeReadCompare(c, filename, contents) } -// TestWriteRead3 tests a simple write-read workflow with a small string +// TestWriteRead3 tests a simple write-read workflow with a small string. func (suite *DriverSuite) TestWriteRead3(c *check.C) { filename := randomString(32) contents := []byte(randomString(32)) - suite.writeReadCompare(c, filename, contents, contents) + suite.writeReadCompare(c, filename, contents) } -// TestWriteRead4 tests a simple write-read workflow with 1MB of data +// TestWriteRead4 tests a simple write-read workflow with 1MB of data. func (suite *DriverSuite) TestWriteRead4(c *check.C) { filename := randomString(32) contents := []byte(randomString(1024 * 1024)) - suite.writeReadCompare(c, filename, contents, contents) + suite.writeReadCompare(c, filename, contents) } -// TestReadNonexistent tests reading content from an empty path +// TestReadNonexistent tests reading content from an empty path. func (suite *DriverSuite) TestReadNonexistent(c *check.C) { filename := randomString(32) _, err := suite.StorageDriver.GetContent(filename) @@ -129,39 +136,39 @@ func (suite *DriverSuite) TestReadNonexistent(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } -// TestWriteReadStreams1 tests a simple write-read streaming workflow +// TestWriteReadStreams1 tests a simple write-read streaming workflow. func (suite *DriverSuite) TestWriteReadStreams1(c *check.C) { filename := randomString(32) contents := []byte("a") - suite.writeReadCompareStreams(c, filename, contents, contents) + suite.writeReadCompareStreams(c, filename, contents) } // TestWriteReadStreams2 tests a simple write-read streaming workflow with -// unicode data +// unicode data. func (suite *DriverSuite) TestWriteReadStreams2(c *check.C) { filename := randomString(32) contents := []byte("\xc3\x9f") - suite.writeReadCompareStreams(c, filename, contents, contents) + suite.writeReadCompareStreams(c, filename, contents) } // TestWriteReadStreams3 tests a simple write-read streaming workflow with a -// small amount of data +// small amount of data. func (suite *DriverSuite) TestWriteReadStreams3(c *check.C) { filename := randomString(32) contents := []byte(randomString(32)) - suite.writeReadCompareStreams(c, filename, contents, contents) + suite.writeReadCompareStreams(c, filename, contents) } // TestWriteReadStreams4 tests a simple write-read streaming workflow with 1MB -// of data +// of data. func (suite *DriverSuite) TestWriteReadStreams4(c *check.C) { filename := randomString(32) contents := []byte(randomString(1024 * 1024)) - suite.writeReadCompareStreams(c, filename, contents, contents) + suite.writeReadCompareStreams(c, filename, contents) } // TestContinueStreamAppend tests that a stream write can be appended to without -// corrupting the data +// corrupting the data. func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) @@ -200,7 +207,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { } // TestReadStreamWithOffset tests that the appropriate data is streamed when -// reading with a given offset +// reading with a given offset. func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) @@ -243,7 +250,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { } // TestReadNonexistentStream tests that reading a stream for a nonexistent path -// fails +// fails. func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { filename := randomString(32) _, err := suite.StorageDriver.ReadStream(filename, 0) @@ -251,7 +258,7 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } -// TestList checks the returned list of keys after populating a directory tree +// TestList checks the returned list of keys after populating a directory tree. func (suite *DriverSuite) TestList(c *check.C) { rootDirectory := "/" + randomString(uint64(8+rand.Intn(8))) defer suite.StorageDriver.Delete(rootDirectory) @@ -282,7 +289,7 @@ func (suite *DriverSuite) TestList(c *check.C) { } // TestMove checks that a moved object no longer exists at the source path and -// does exist at the destination +// does exist at the destination. func (suite *DriverSuite) TestMove(c *check.C) { contents := []byte(randomString(32)) sourcePath := randomString(32) @@ -335,7 +342,7 @@ func (suite *DriverSuite) TestDelete(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } -// TestDeleteNonexistent checks that removing a nonexistent key fails +// TestDeleteNonexistent checks that removing a nonexistent key fails. func (suite *DriverSuite) TestDeleteNonexistent(c *check.C) { filename := randomString(32) err := suite.StorageDriver.Delete(filename) @@ -343,7 +350,7 @@ func (suite *DriverSuite) TestDeleteNonexistent(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } -// TestDeleteFolder checks that deleting a folder removes all child elements +// TestDeleteFolder checks that deleting a folder removes all child elements. func (suite *DriverSuite) TestDeleteFolder(c *check.C) { dirname := randomString(32) filename1 := randomString(32) @@ -371,7 +378,64 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } -func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents, expected []byte) { +// TestConcurrentFileStreams checks that multiple *os.File objects can be passed +// in to WriteStream concurrently without hanging. +// TODO(bbland): fix this test... +func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { + if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC { + c.Skip("Need to fix out-of-process concurrency") + } + + doneChan := make(chan struct{}) + + testStream := func(size int) { + suite.testFileStreams(c, size) + doneChan <- struct{}{} + } + + go testStream(8 * 1024 * 1024) + go testStream(4 * 1024 * 1024) + go testStream(2 * 1024 * 1024) + go testStream(1024 * 1024) + go testStream(1024) + go testStream(64) + + for i := 0; i < 6; i++ { + <-doneChan + } + +} + +func (suite *DriverSuite) testFileStreams(c *check.C, size int) { + tf, err := ioutil.TempFile("", "tf") + c.Assert(err, check.IsNil) + defer os.Remove(tf.Name()) + + tfName := path.Base(tf.Name()) + defer suite.StorageDriver.Delete(tfName) + + contents := []byte(randomString(uint64(size))) + + _, err = tf.Write(contents) + c.Assert(err, check.IsNil) + + tf.Sync() + tf.Seek(0, os.SEEK_SET) + + err = suite.StorageDriver.WriteStream(tfName, 0, uint64(size), tf) + c.Assert(err, check.IsNil) + + reader, err := suite.StorageDriver.ReadStream(tfName, 0) + c.Assert(err, check.IsNil) + defer reader.Close() + + readContents, err := ioutil.ReadAll(reader) + c.Assert(err, check.IsNil) + + c.Assert(readContents, check.DeepEquals, contents) +} + +func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) err := suite.StorageDriver.PutContent(filename, contents) @@ -383,7 +447,7 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents c.Assert(readContents, check.DeepEquals, contents) } -func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents, expected []byte) { +func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents)))