new file mode 100755
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+#
+# Test case QMP's encrypted key management
+#
+# Copyright (C) 2019 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import iotests
+import os
+import time
+import json
+
+test_img = os.path.join(iotests.test_dir, 'test.img')
+
+class Secret:
+ def __init__(self, index):
+ self._id = "keysec" + str(index)
+ # you are not supposed to see the password...
+ self._secret = "hunter" + str(index)
+
+ def id(self):
+ return self._id
+
+ def secret(self):
+ return self._secret
+
+ def to_cmdline_object(self):
+ return [ "secret,id=" + self._id + ",data=" + self._secret]
+
+ def to_qmp_object(self):
+ return { "qom_type" : "secret", "id": self.id(),
+ "props": { "data": self.secret() } }
+
+################################################################################
+class EncryptionSetupTestCase(iotests.QMPTestCase):
+
+ # test case startup
+ def setUp(self):
+ # start the VM
+ self.vm = iotests.VM()
+ self.vm.launch()
+
+ # create the secrets and load 'em into the VM
+ self.secrets = [ Secret(i) for i in range(0, 6) ]
+ for secret in self.secrets:
+ result = self.vm.qmp("object-add", **secret.to_qmp_object())
+ self.assert_qmp(result, 'return', {})
+
+ if iotests.imgfmt == "qcow2":
+ self.pfx = "encrypt."
+ self.img_opts = [ '-o', "encrypt.format=luks" ]
+ else:
+ self.pfx = ""
+ self.img_opts = []
+
+ # test case shutdown
+ def tearDown(self):
+ # stop the VM
+ self.vm.shutdown()
+
+ ###########################################################################
+ # create the encrypted block device
+ def createImg(self, file, secret):
+
+ iotests.qemu_img(
+ 'create',
+ '--object', *secret.to_cmdline_object(),
+ '-f', iotests.imgfmt,
+ '-o', self.pfx + 'key-secret=' + secret.id(),
+ '-o', self.pfx + 'iter-time=10',
+ *self.img_opts,
+ file,
+ '1M')
+
+ ###########################################################################
+ # open an encrypted block device
+ def openImageQmp(self, id, file, secret, read_only = False):
+
+ encrypt_options = {
+ 'key-secret' : secret.id()
+ }
+
+ if iotests.imgfmt == "qcow2":
+ encrypt_options = {
+ 'encrypt': {
+ 'format':'luks',
+ **encrypt_options
+ }
+ }
+
+ result = self.vm.qmp('blockdev-add', **
+ {
+ 'driver': iotests.imgfmt,
+ 'node-name': id,
+ 'read-only': read_only,
+
+ **encrypt_options,
+
+ 'file': {
+ 'driver': 'file',
+ 'filename': test_img,
+ }
+ }
+ )
+ self.assert_qmp(result, 'return', {})
+
+ # close the encrypted block device
+ def closeImageQmp(self, id):
+ result = self.vm.qmp('blockdev-del', **{ 'node-name': id })
+ self.assert_qmp(result, 'return', {})
+
+ ###########################################################################
+ # add a key to an encrypted block device
+ def addKeyQmp(self, id, new_secret, secret = None,
+ slot = None, force = False):
+
+ crypt_options = {
+ 'state' : 'active',
+ 'new-secret' : new_secret.id(),
+ 'iter-time' : 10
+ }
+
+ if slot != None:
+ crypt_options['keyslot'] = slot
+
+
+ if secret != None:
+ crypt_options['secret'] = secret.id()
+
+ if iotests.imgfmt == "qcow2":
+ crypt_options['format'] = 'luks'
+ crypt_options = {
+ 'encrypt': crypt_options
+ }
+
+ args = {
+ 'node-name': id,
+ 'job-id' : 'job_add_key',
+ 'options' : {
+ 'driver' : iotests.imgfmt,
+ **crypt_options
+ },
+ }
+
+ if force == True:
+ args['force'] = True
+
+ #TODO: check what jobs return
+ result = self.vm.qmp('x-blockdev-amend', **args)
+ assert result['return'] == {}
+ self.vm.run_job('job_add_key')
+
+ # erase a key from an encrypted block device
+ def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False):
+
+ crypt_options = {
+ 'state' : 'inactive',
+ }
+
+ if slot != None:
+ crypt_options['keyslot'] = slot
+ if old_secret != None:
+ crypt_options['old-secret'] = old_secret.id()
+
+ if iotests.imgfmt == "qcow2":
+ crypt_options['format'] = 'luks'
+ crypt_options = {
+ 'encrypt': crypt_options
+ }
+
+ args = {
+ 'node-name': id,
+ 'job-id' : 'job_erase_key',
+ 'options' : {
+ 'driver' : iotests.imgfmt,
+ **crypt_options
+ },
+ }
+
+ if force == True:
+ args['force'] = True
+
+ result = self.vm.qmp('x-blockdev-amend', **args)
+ assert result['return'] == {}
+ self.vm.run_job('job_erase_key')
+
+ ###########################################################################
+ # create image, and change its key
+ def testChangeKey(self):
+
+ # create the image with secret0 and open it
+ self.createImg(test_img, self.secrets[0]);
+ self.openImageQmp("testdev", test_img, self.secrets[0])
+
+ # add key to slot 1
+ self.addKeyQmp("testdev", new_secret = self.secrets[1])
+
+ # add key to slot 5
+ self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5)
+
+ # erase key from slot 0
+ self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
+
+ #reopen the image with secret1
+ self.closeImageQmp("testdev")
+ self.openImageQmp("testdev", test_img, self.secrets[1])
+
+ # close and erase the image for good
+ self.closeImageQmp("testdev")
+ os.remove(test_img)
+
+ # test that if we erase the old password,
+ # we can still change the encryption keys using 'old-secret'
+ def testOldPassword(self):
+
+ # create the image with secret0 and open it
+ self.createImg(test_img, self.secrets[0]);
+ self.openImageQmp("testdev", test_img, self.secrets[0])
+
+ # add key to slot 1
+ self.addKeyQmp("testdev", new_secret = self.secrets[1])
+
+ # erase key from slot 0
+ self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
+
+ # this will fail as the old password is no longer valid
+ self.addKeyQmp("testdev", new_secret = self.secrets[2])
+
+ # this will work
+ self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1])
+
+ # close and erase the image for good
+ self.closeImageQmp("testdev")
+ os.remove(test_img)
+
+ def testUseForceLuke(self):
+
+ self.createImg(test_img, self.secrets[0]);
+ self.openImageQmp("testdev", test_img, self.secrets[0])
+
+ # Add bunch of secrets
+ self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4)
+ self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2)
+
+ # overwrite an active secret
+ self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2)
+ self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True)
+
+ self.addKeyQmp("testdev", new_secret = self.secrets[0])
+
+ # Now erase all the secrets
+ self.eraseKeyQmp("testdev", old_secret = self.secrets[5])
+ self.eraseKeyQmp("testdev", slot=4)
+
+ # erase last keyslot
+ self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
+ self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True)
+
+ self.closeImageQmp("testdev")
+ os.remove(test_img)
+
+
+if __name__ == '__main__':
+ # Encrypted formats support
+ iotests.main(supported_fmts = ['qcow2', 'luks'])
new file mode 100644
@@ -0,0 +1,40 @@
+...
+----------------------------------------------------------------------
+Ran 3 tests
+
+OK
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_erase_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_erase_key"}}
+{"return": {}}
+Job failed: Invalid password, cannot unlock any keyslot
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+Job failed: Refusing to overwrite active slot 2 - please erase it first
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_add_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_erase_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_erase_key"}}
+{"return": {}}
+Job failed: Requested operation will erase all active keyslots which will erase all the data in the image irreversibly - refusing operation
+{"execute": "job-dismiss", "arguments": {"id": "job_erase_key"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job_erase_key"}}
+{"return": {}}
new file mode 100755
@@ -0,0 +1,233 @@
+#!/usr/bin/env python3
+#
+# Test case for encryption key management versus image sharing
+#
+# Copyright (C) 2019 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import iotests
+import os
+import time
+import json
+
+test_img = os.path.join(iotests.test_dir, 'test.img')
+
+class Secret:
+ def __init__(self, index):
+ self._id = "keysec" + str(index)
+ # you are not supposed to see the password...
+ self._secret = "hunter" + str(index)
+
+ def id(self):
+ return self._id
+
+ def secret(self):
+ return self._secret
+
+ def to_cmdline_object(self):
+ return [ "secret,id=" + self._id + ",data=" + self._secret]
+
+ def to_qmp_object(self):
+ return { "qom_type" : "secret", "id": self.id(),
+ "props": { "data": self.secret() } }
+
+################################################################################
+
+class EncryptionSetupTestCase(iotests.QMPTestCase):
+
+ # test case startup
+ def setUp(self):
+
+ # start the VMs
+ self.vm1 = iotests.VM(path_suffix = 'VM1')
+ self.vm2 = iotests.VM(path_suffix = 'VM2')
+ self.vm1.launch()
+ self.vm2.launch()
+
+ # create the secrets and load 'em into the VMs
+ self.secrets = [ Secret(i) for i in range(0, 4) ]
+ for secret in self.secrets:
+ result = self.vm1.qmp("object-add", **secret.to_qmp_object())
+ self.assert_qmp(result, 'return', {})
+ result = self.vm2.qmp("object-add", **secret.to_qmp_object())
+ self.assert_qmp(result, 'return', {})
+
+ # test case shutdown
+ def tearDown(self):
+ # stop the VM
+ self.vm1.shutdown()
+ self.vm2.shutdown()
+
+ ###########################################################################
+ # create the encrypted block device using qemu-img
+ def createImg(self, file, secret):
+
+ output = iotests.qemu_img_pipe(
+ 'create',
+ '--object', *secret.to_cmdline_object(),
+ '-f', iotests.imgfmt,
+ '-o', 'key-secret=' + secret.id(),
+ '-o', 'iter-time=10',
+ file,
+ '1M')
+
+ iotests.log(output, filters=[iotests.filter_test_dir])
+
+ # attempts to add a key using qemu-img
+ def addKey(self, file, secret, new_secret):
+
+ image_options = {
+ 'key-secret' : secret.id(),
+ 'driver' : iotests.imgfmt,
+ 'file' : {
+ 'driver':'file',
+ 'filename': file,
+ }
+ }
+
+ output = iotests.qemu_img_pipe(
+ 'amend',
+ '--object', *secret.to_cmdline_object(),
+ '--object', *new_secret.to_cmdline_object(),
+
+ '-o', 'state=active',
+ '-o', 'new-secret=' + new_secret.id(),
+ '-o', 'iter-time=10',
+
+ "json:" + json.dumps(image_options)
+ )
+
+ iotests.log(output, filters=[iotests.filter_test_dir])
+
+ ###########################################################################
+ # open an encrypted block device
+ def openImageQmp(self, vm, id, file, secret,
+ readOnly = False, reOpen = False):
+
+ command = 'x-blockdev-reopen' if reOpen else 'blockdev-add'
+
+ result = vm.qmp(command, **
+ {
+ 'driver': iotests.imgfmt,
+ 'node-name': id,
+ 'read-only': readOnly,
+ 'key-secret' : secret.id(),
+ 'file': {
+ 'driver': 'file',
+ 'filename': test_img,
+ }
+ }
+ )
+ self.assert_qmp(result, 'return', {})
+
+ # close the encrypted block device
+ def closeImageQmp(self, vm, id):
+ result = vm.qmp('blockdev-del', **{ 'node-name': id })
+ self.assert_qmp(result, 'return', {})
+
+ ###########################################################################
+
+ # add a key to an encrypted block device
+ def addKeyQmp(self, vm, id, new_secret):
+
+ args = {
+ 'node-name': id,
+ 'job-id' : 'job0',
+ 'options' : {
+ 'state' : 'active',
+ 'driver' : iotests.imgfmt,
+ 'new-secret': new_secret.id(),
+ 'iter-time' : 10
+ },
+ }
+
+ result = vm.qmp('x-blockdev-amend', **args)
+ assert result['return'] == {}
+ vm.run_job('job0')
+
+ # test that when the image opened by two qemu processes,
+ # neither of them can update the image
+ def test1(self):
+ self.createImg(test_img, self.secrets[0]);
+
+ # VM1 opens the image and adds a key
+ self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
+ self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[1])
+
+
+ # VM2 opens the image
+ self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
+
+
+ # neither VMs now should be able to add a key
+ self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
+ self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
+
+
+ # VM 1 closes the image
+ self.closeImageQmp(self.vm1, "testdev")
+
+
+ # now VM2 can add the key
+ self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
+
+
+ # qemu-img should also not be able to add a key
+ self.addKey(test_img, self.secrets[0], self.secrets[2])
+
+ # cleanup
+ self.closeImageQmp(self.vm2, "testdev")
+ os.remove(test_img)
+
+
+ def test2(self):
+ self.createImg(test_img, self.secrets[0]);
+
+ # VM1 opens the image readonly
+ self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
+ readOnly = True)
+
+ # VM2 opens the image
+ self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
+
+ # VM1 can't add a key since image is readonly
+ self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
+
+ # VM2 can't add a key since VM is has the image opened
+ self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
+
+
+ #VM1 reopens the image read-write
+ self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
+ reOpen = True, readOnly = False)
+
+ # VM1 still can't add the key
+ self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
+
+ # VM2 gets away
+ self.closeImageQmp(self.vm2, "testdev")
+
+ # VM1 now can add the key
+ self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
+
+ self.closeImageQmp(self.vm1, "testdev")
+ os.remove(test_img)
+
+
+if __name__ == '__main__':
+ # support only raw luks since luks encrypted qcow2 is a proper
+ # format driver which doesn't allow any sharing
+ iotests.main(supported_fmts = ['luks'])
new file mode 100644
@@ -0,0 +1,33 @@
+..
+----------------------------------------------------------------------
+Ran 2 tests
+
+OK
+Formatting 'TEST_DIR/test.img', fmt=luks size=1048576 key-secret=keysec0 iter-time=10
+
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+Job failed: Failed to get shared "consistent read" lock
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+Job failed: Failed to get shared "consistent read" lock
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+qemu-img: Failed to get shared "consistent read" lock
+Is another process using the image [TEST_DIR/test.img]?
+
+Formatting 'TEST_DIR/test.img', fmt=luks size=1048576 key-secret=keysec0 iter-time=10
+
+Job failed: Block node is read-only
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+Job failed: Failed to get shared "consistent read" lock
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+Job failed: Failed to get shared "consistent read" lock
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
+{"execute": "job-dismiss", "arguments": {"id": "job0"}}
+{"return": {}}
@@ -301,3 +301,5 @@
300 rw auto
301 rw auto quick
+302 rw auto
+303 rw auto