=== added file 'lava_tool/authtoken.py'
@@ -0,0 +1,112 @@
+# Copyright (C) 2011 Linaro Limited
+#
+# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+import urllib
+import xmlrpclib
+
+import keyring.core
+
+from lava_tool.interface import LavaCommandError
+
+class AuthBackend(object):
+
+ def add_token(self, username, hostname, token):
+ raise NotImplementedError
+
+ def get_token_for_host(self, user, host):
+ raise NotImplementedError
+
+
+class KeyringAuthBackend(AuthBackend):
+
+ def add_token(self, username, hostname, token):
+ keyring.core.set_password("lava-tool-%s" % hostname, username, token)
+
+ def get_token_for_host(self, username, hostname):
+ return keyring.core.get_password("lava-tool-%s" % hostname, username)
+
+
+class MemoryAuthBackend(AuthBackend):
+
+ def __init__(self, user_host_token_list):
+ self._tokens = {}
+ for user, host, token in user_host_token_list:
+ self._tokens[(user, host)] = token
+
+ def add_token(self, username, hostname, token):
+ self._tokens[(username, hostname)] = token
+
+ def get_token_for_host(self, username, host):
+ return self._tokens.get((username, host))
+
+
+class AuthenticatingTransportMixin:
+
+ def get_host_info(self, host):
+
+ x509 = {}
+ if isinstance(host, tuple):
+ host, x509 = host
+
+ auth, host = urllib.splituser(host)
+
+ if auth:
+ user, token = urllib.splitpasswd(auth)
+ if token is None:
+ token = self.auth_backend.get_token_for_host(user, host)
+ if token is None:
+ raise LavaCommandError(
+ "Username provided but no token found.")
+ auth = base64.b64encode(urllib.unquote(user + ':' + token))
+ extra_headers = [
+ ("Authorization", "Basic " + auth)
+ ]
+ else:
+ extra_headers = None
+
+ return host, extra_headers, x509
+
+
+class AuthenticatingTransport(
+ AuthenticatingTransportMixin, xmlrpclib.Transport):
+ def __init__(self, use_datetime=0, auth_backend=None):
+ xmlrpclib.Transport.__init__(self, use_datetime)
+ self.auth_backend = auth_backend
+
+
+class AuthenticatingSafeTransport(
+ AuthenticatingTransportMixin, xmlrpclib.SafeTransport):
+ def __init__(self, use_datetime=0, auth_backend=None):
+ xmlrpclib.SafeTransport.__init__(self, use_datetime)
+ self.auth_backend = auth_backend
+
+
+class AuthenticatingServerProxy(xmlrpclib.ServerProxy):
+
+ def __init__(self, uri, transport=None, encoding=None, verbose=0,
+ allow_none=0, use_datetime=0, auth_backend=None):
+ if transport is None:
+ if urllib.splittype(uri)[0] == "https":
+ transport = AuthenticatingSafeTransport(
+ use_datetime=use_datetime, auth_backend=auth_backend)
+ else:
+ transport = AuthenticatingTransport(
+ use_datetime=use_datetime, auth_backend=auth_backend)
+ xmlrpclib.ServerProxy.__init__(
+ self, uri, transport, encoding, verbose, allow_none, use_datetime)
=== added file 'lava_tool/commands/auth.py'
@@ -0,0 +1,123 @@
+# Copyright (C) 2011 Linaro Limited
+#
+# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+import argparse
+import getpass
+import urlparse
+import xmlrpclib
+
+from lava_tool.authtoken import (
+ AuthenticatingServerProxy,
+ KeyringAuthBackend,
+ MemoryAuthBackend,
+ )
+from lava_tool.interface import Command, LavaCommandError
+
+
+def normalize_xmlrpc_url(uri):
+ if '://' not in uri:
+ uri = 'http://' + uri
+ if not uri.endswith('/'):
+ uri += '/'
+ if not uri.endswith('/RPC2/'):
+ uri += 'RPC2/'
+ return uri
+
+
+class auth_add(Command):
+ """
+ Add an authentication token.
+ """
+
+ def __init__(self, parser, args, auth_backend=None):
+ super(auth_add, self).__init__(parser, args)
+ if auth_backend is None:
+ auth_backend = KeyringAuthBackend()
+ self.auth_backend = auth_backend
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(auth_add, cls).register_arguments(parser)
+ parser.add_argument(
+ "HOST",
+ help=("Endpoint to add token for, in the form "
+ "scheme://username@host. The username will default to "
+ "the currently logged in user."))
+ parser.add_argument(
+ "--token-file", default=None,
+ help="Read the secret from here rather than prompting for it.")
+ parser.add_argument(
+ "--no-check", action='store_true',
+ help=("By default, a call to the remote server is made to check "
+ "that the added token works before remembering it. "
+ "Passing this option prevents this check."))
+
+ def invoke(self):
+ uri = normalize_xmlrpc_url(self.args.HOST)
+ parsed_host = urlparse.urlparse(uri)
+
+ if parsed_host.username:
+ username = parsed_host.username
+ else:
+ username = getpass.getuser()
+
+ host = parsed_host.hostname
+ if parsed_host.port:
+ host += ':' + str(parsed_host.port)
+
+ uri = '%s://%s@%s/RPC2/' % (parsed_host.scheme, username, host)
+
+ if self.args.token_file:
+ if parsed_host.password:
+ raise LavaCommandError(
+ "Token specified in url but --token-file also passed.")
+ else:
+ try:
+ token_file = open(self.args.token_file)
+ except IOError as ex:
+ raise LavaCommandError("opening %r failed: %s" % (self.args.token_file, ex))
+ token = token_file.read()
+ else:
+ if parsed_host.password:
+ token = parsed_host.password
+ else:
+ token = getpass.getpass("Paste token for %s: " % uri)
+
+ if not self.args.no_check:
+ sp = AuthenticatingServerProxy(
+ uri, auth_backend=MemoryAuthBackend(
+ [(username, host, token)]))
+ try:
+ token_user = sp.system.whoami()
+ except xmlrpclib.ProtocolError as ex:
+ if ex.errcode == 401:
+ raise LavaCommandError(
+ "Token rejected by server for user %s." % username)
+ else:
+ raise
+ except xmlrpclib.Fault as ex:
+ raise LavaCommandError(
+ "Server reported error during check: %s." % ex)
+ if token_user != username:
+ raise LavaCommandError(
+ "whoami() returned %s rather than expected %s -- this is "
+ "a bug." % (token_user, username))
+
+ self.auth_backend.add_token(username, host, token)
+
+ print 'Token added successfully for user %s.' % username
=== modified file 'lava_tool/tests/__init__.py'
@@ -35,6 +35,8 @@
def test_modules():
return [
+ 'lava_tool.tests.test_authtoken',
+ 'lava_tool.tests.test_auth_commands',
'lava_tool.tests.test_commands',
]
=== added file 'lava_tool/tests/test_auth_commands.py'
@@ -0,0 +1,202 @@
+# Copyright (C) 2011 Linaro Limited
+#
+# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Unit tests for the lava_tool.commands.auth package
+"""
+
+import StringIO
+import sys
+import tempfile
+import xmlrpclib
+
+from lava_tool.authtoken import MemoryAuthBackend
+from lava_tool.mocker import ARGS, KWARGS, CONTAINS, MockerTestCase
+from lava_tool.interface import LavaCommandError
+from lava_tool.commands.auth import auth_add
+
+
+class FakeArgs:
+ token_file = None
+ no_check = False
+
+class AuthAddTests(MockerTestCase):
+
+ def setUp(self):
+ MockerTestCase.setUp(self)
+ self.saved_stdout = sys.stdout
+ sys.stdout = StringIO.StringIO()
+ self.saved_stderr = sys.stderr
+ sys.stderr = StringIO.StringIO()
+
+ def tearDown(self):
+ MockerTestCase.tearDown(self)
+ sys.stdout = self.saved_stdout
+ sys.stderr = self.saved_stderr
+
+ def make_command(self, auth_backend, **kwargs):
+ args = FakeArgs()
+ args.__dict__.update(kwargs)
+ return auth_add(None, args, auth_backend)
+
+ def test_token_taken_from_argument(self):
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=True)
+ cmd.invoke()
+ self.assertEqual(
+ 'TOKEN', auth_backend.get_token_for_host('user', 'example.com'))
+
+ def test_token_taken_from_getpass(self):
+ mocked_getpass = self.mocker.replace('getpass.getpass', passthrough=False)
+ mocked_getpass(CONTAINS('Paste token'))
+ self.mocker.result("TOKEN")
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user@example.com', no_check=True)
+ cmd.invoke()
+ self.assertEqual(
+ 'TOKEN', auth_backend.get_token_for_host('user', 'example.com'))
+
+ def test_token_taken_from_file(self):
+ auth_backend = MemoryAuthBackend([])
+ token_file = tempfile.NamedTemporaryFile('w')
+ token_file.write("TOKEN")
+ token_file.flush()
+ cmd = self.make_command(
+ auth_backend, HOST='http://user@example.com', no_check=True,
+ token_file=token_file.name)
+ cmd.invoke()
+ self.assertEqual(
+ 'TOKEN', auth_backend.get_token_for_host('user', 'example.com'))
+
+ def test_token_file_and_in_url_conflict(self):
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=True,
+ token_file='some-file-name')
+ self.assertRaises(LavaCommandError, cmd.invoke)
+
+ def test_non_existent_token_reported(self):
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=True,
+ token_file='does-not-exist')
+ self.assertRaises(LavaCommandError, cmd.invoke)
+
+ def test_user_taken_from_getuser(self):
+ mocked_getuser = self.mocker.replace('getpass.getuser', passthrough=False)
+ mocked_getuser()
+ self.mocker.result("user")
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ token_file = tempfile.NamedTemporaryFile('w')
+ token_file.write("TOKEN")
+ token_file.flush()
+ cmd = self.make_command(
+ auth_backend, HOST='http://example.com', no_check=True,
+ token_file=token_file.name)
+ cmd.invoke()
+ self.assertEqual(
+ 'TOKEN', auth_backend.get_token_for_host('user', 'example.com'))
+
+ def test_port_included(self):
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com:1234', no_check=True)
+ cmd.invoke()
+ self.assertEqual(
+ 'TOKEN', auth_backend.get_token_for_host('user', 'example.com:1234'))
+
+ def test_check_made(self):
+ mocked_AuthenticatingServerProxy = self.mocker.replace(
+ 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
+ mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
+ # nospec() is required because of
+ # https://bugs.launchpad.net/mocker/+bug/794351
+ self.mocker.nospec()
+ mocked_sp.system.whoami()
+ self.mocker.result('user')
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com:1234', no_check=False)
+ cmd.invoke()
+ self.assertEqual(
+ 'TOKEN', auth_backend.get_token_for_host('user', 'example.com:1234'))
+
+ def test_check_auth_failure_reported_nicely(self):
+ mocked_AuthenticatingServerProxy = self.mocker.replace(
+ 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
+ mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
+ # nospec() is required because of
+ # https://bugs.launchpad.net/mocker/+bug/794351
+ self.mocker.nospec()
+ mocked_sp.system.whoami()
+ self.mocker.throw(xmlrpclib.ProtocolError('', 401, '', []))
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
+ self.assertRaises(LavaCommandError, cmd.invoke)
+
+ def test_check_fails_token_not_recorded(self):
+ mocked_AuthenticatingServerProxy = self.mocker.replace(
+ 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
+ mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
+ self.mocker.nospec()
+ mocked_sp.system.whoami()
+ self.mocker.throw(xmlrpclib.ProtocolError('', 401, '', []))
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
+ self.assertRaises(LavaCommandError, cmd.invoke)
+ self.assertEqual(
+ None, auth_backend.get_token_for_host('user', 'example.com'))
+
+ def test_check_other_http_failure_just_raised(self):
+ mocked_AuthenticatingServerProxy = self.mocker.replace(
+ 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
+ mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
+ # nospec() is required because of
+ # https://bugs.launchpad.net/mocker/+bug/794351
+ self.mocker.nospec()
+ mocked_sp.system.whoami()
+ self.mocker.throw(xmlrpclib.ProtocolError('', 500, '', []))
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
+ self.assertRaises(xmlrpclib.ProtocolError, cmd.invoke)
+
+ def test_fault_reported(self):
+ mocked_AuthenticatingServerProxy = self.mocker.replace(
+ 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
+ mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
+ # nospec() is required because of
+ # https://bugs.launchpad.net/mocker/+bug/794351
+ self.mocker.nospec()
+ mocked_sp.system.whoami()
+ self.mocker.throw(xmlrpclib.Fault(100, 'faultString'))
+ self.mocker.replay()
+ auth_backend = MemoryAuthBackend([])
+ cmd = self.make_command(
+ auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
+ self.assertRaises(LavaCommandError, cmd.invoke)
=== added file 'lava_tool/tests/test_authtoken.py'
@@ -0,0 +1,69 @@
+# Copyright (C) 2011 Linaro Limited
+#
+# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Unit tests for the lava_tool.authtoken package
+"""
+
+import base64
+from unittest import TestCase
+
+from lava_tool.authtoken import (
+ AuthenticatingTransportMixin,
+ MemoryAuthBackend,
+ )
+from lava_tool.interface import LavaCommandError
+
+
+class TestAuthenticatingTransportMixin(TestCase):
+
+ def headers_for_host(self, host, auth_backend):
+ a = AuthenticatingTransportMixin()
+ a.auth_backend = auth_backend
+ _, headers, _ = a.get_host_info(host)
+ return headers
+
+ def user_and_password_from_headers(self, headers):
+ if len(headers) != 1:
+ self.fail("expected exactly 1 header, got %r" % headers)
+ [(name, value)] = headers
+ if name != 'Authorization':
+ self.fail("non-authorization header found in %r" % headers)
+ if not value.startswith("Basic "):
+ self.fail("non-basic auth header found in %r" % headers)
+ auth = base64.b64decode(value[len("Basic "):])
+ if ':' in auth:
+ return tuple(auth.split(':', 1))
+ else:
+ return (auth, None)
+
+ def test_no_user_no_auth(self):
+ headers = self.headers_for_host('example.com', MemoryAuthBackend([]))
+ self.assertEqual(None, headers)
+
+ def test_error_when_user_but_no_token(self):
+ self.assertRaises(
+ LavaCommandError,
+ self.headers_for_host, 'user@example.com', MemoryAuthBackend([]))
+
+ def test_token_used_for_auth(self):
+ headers = self.headers_for_host(
+ 'user@example.com',
+ MemoryAuthBackend([('user', 'example.com', "TOKEN")]))
+ self.assertEqual(
+ ('user', 'TOKEN'), self.user_and_password_from_headers(headers))
=== modified file 'setup.py'
@@ -45,6 +45,7 @@
lava-tool = lava_tool.dispatcher:main
[lava_tool.commands]
help = lava_tool.commands.misc:help
+ auth-add = lava_tool.commands.auth:auth_add
""",
classifiers=[
"Development Status :: 4 - Beta",
@@ -55,7 +56,8 @@
"Topic :: Software Development :: Testing",
],
install_requires = [
- 'argparse >= 1.1'
+ 'argparse >= 1.1',
+ 'keyring',
],
setup_requires = [
'versiontools >= 1.1',