Improves storagedriver concurrency testing
Creates trees instead of flat files for TestConcurrentFileStreams Adds TestConcurrentStreamReads, which writes a large file (smaller in Short mode), and then ensures that several concurrent readers properly read their portions of the file with random offsets
This commit is contained in:
parent
cb25cc65bf
commit
9297693675
1 changed files with 42 additions and 5 deletions
|
@ -651,9 +651,46 @@ func (suite *DriverSuite) TestStatCall(c *check.C) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestConcurrentStreamReads checks that multiple clients can safely read from
|
||||||
|
// the same file simultaneously with various offsets.
|
||||||
|
func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
|
||||||
|
var filesize int64 = 128 * 1024 * 1024
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
filesize = 10 * 1024 * 1024
|
||||||
|
c.Log("Reducing file size to 10MB for short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := randomPath(32)
|
||||||
|
contents := randomContents(filesize)
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(firstPart(filename))
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(filename, contents)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
readContents := func() {
|
||||||
|
defer wg.Done()
|
||||||
|
offset := rand.Int63n(int64(len(contents)))
|
||||||
|
reader, err := suite.StorageDriver.ReadStream(filename, offset)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
readContents, err := ioutil.ReadAll(reader)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
c.Assert(readContents, check.DeepEquals, contents[offset:])
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go readContents()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
|
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
|
||||||
// in to WriteStream concurrently without hanging.
|
// in to WriteStream concurrently without hanging.
|
||||||
// TODO(bbland): fix this test...
|
|
||||||
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
|
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
|
||||||
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
|
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
|
||||||
// c.Skip("Need to fix out-of-process concurrency")
|
// c.Skip("Need to fix out-of-process concurrency")
|
||||||
|
@ -682,8 +719,8 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
defer os.Remove(tf.Name())
|
defer os.Remove(tf.Name())
|
||||||
|
|
||||||
tfName := path.Base(tf.Name())
|
filename := randomPath(32)
|
||||||
defer suite.StorageDriver.Delete(tfName)
|
defer suite.StorageDriver.Delete(firstPart(filename))
|
||||||
|
|
||||||
contents := randomContents(size)
|
contents := randomContents(size)
|
||||||
|
|
||||||
|
@ -693,11 +730,11 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
|
||||||
tf.Sync()
|
tf.Sync()
|
||||||
tf.Seek(0, os.SEEK_SET)
|
tf.Seek(0, os.SEEK_SET)
|
||||||
|
|
||||||
nn, err := suite.StorageDriver.WriteStream(tfName, 0, tf)
|
nn, err := suite.StorageDriver.WriteStream(filename, 0, tf)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(nn, check.Equals, size)
|
c.Assert(nn, check.Equals, size)
|
||||||
|
|
||||||
reader, err := suite.StorageDriver.ReadStream(tfName, 0)
|
reader, err := suite.StorageDriver.ReadStream(filename, 0)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue