PK ! C openconnect_sso/__init__.py__version__ = "0.3.3"
__description__ = "Wrapper script for OpenConnect supporting Azure AD (SAMLv2) authentication to Cisco SSL-VPNs"
PK ! g5\ \ openconnect_sso/app.pyimport asyncio
import getpass
import logging
import os
import signal
from pathlib import Path
import structlog
from prompt_toolkit import HTML
from prompt_toolkit.eventloop import use_asyncio_event_loop
from prompt_toolkit.shortcuts import radiolist_dialog
from openconnect_sso import config
from openconnect_sso.authenticator import Authenticator
from openconnect_sso.config import Credentials
from openconnect_sso.profile import get_profiles
logger = structlog.get_logger()
def run(args):
configure_logger(logging.getLogger(), args.log_level)
loop = asyncio.get_event_loop()
use_asyncio_event_loop(loop)
try:
return asyncio.get_event_loop().run_until_complete(_run(args))
except KeyboardInterrupt:
logger.warn("CTRL-C pressed, exiting")
def configure_logger(logger, level):
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer()
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)
async def _run(args):
cfg = config.load()
credentials = None
if cfg.credentials:
credentials = cfg.credentials
elif args.user:
credentials = Credentials(args.user)
credentials.password = getpass.getpass(prompt=f"Password ({args.user}): ")
cfg.credentials = credentials
if cfg.default_profile and not args.use_profile_selector:
selected_profile = cfg.default_profile
elif args.use_profile_selector or args.profile_path:
profiles = get_profiles(Path(args.profile_path))
if not profiles:
logger.error("No profile found")
return 17
selected_profile = await select_profile(profiles)
if not selected_profile:
logger.error("No profile selected")
return 18
elif args.server:
selected_profile = config.HostProfile(args.server, args.usergroup)
else:
raise ValueError(
"Cannot determine server address. Invalid arguments specified."
)
cfg.default_profile = selected_profile
config.save(cfg)
session_token = await authenticate_to(selected_profile, credentials)
if args.login_only:
logger.warn("Exiting after login, as requested")
return 0
return await run_openconnect(session_token, selected_profile, args.openconnect_args)
async def select_profile(profile_list):
selection = await radiolist_dialog(
title="Select Anyconnect profile",
text=HTML(
"The following Anyconnect profiles are detected.\n"
"The selection will be saved and not asked again unless the
--profile-selector
command line option is used"
),
values=[(p, p.name) for i, p in enumerate(profile_list)],
async_=True,
).to_asyncio_future()
asyncio.get_event_loop().remove_signal_handler(signal.SIGWINCH)
if not selection:
return selection
logger.info("Selected profile", profile=selection.name)
return selection
def authenticate_to(host, credentials):
logger.info("Authenticating to VPN endpoint", name=host.name, address=host.address)
return Authenticator(host, credentials=credentials).authenticate()
async def run_openconnect(auth_info, host, args):
command_line = [
"sudo",
"openconnect",
"--cookie-on-stdin",
"--servercert",
auth_info.server_cert_hash,
*args,
]
logger.debug("Starting OpenConnect", command_line=command_line)
proc = await asyncio.create_subprocess_exec(
*command_line,
host.vpn_url,
stdin=asyncio.subprocess.PIPE,
stdout=None,
stderr=None,
)
proc.stdin.write(f"{auth_info.session_token}\n".encode())
await proc.stdin.drain()
await proc.wait()
PK ! ˛}I` ` openconnect_sso/authenticator.pyimport attr
import requests
import structlog
from lxml import etree, objectify
from openconnect_sso.saml_authenticator import authenticate_in_browser
logger = structlog.get_logger()
class Authenticator:
def __init__(self, host, credentials=None):
self.session = create_http_session()
self.host = host
self.credentials = credentials
self.auth_state = StartAuthentication(authenticator=self)
async def authenticate(self):
assert isinstance(self.auth_state, StartAuthentication)
logger.debug("Entering state", state=self.auth_state)
while not isinstance(self.auth_state, AuthenticationCompleted):
self.auth_state = await self.auth_state.trigger()
logger.debug("Entering state", state=self.auth_state)
return self.auth_state.auth_completed_response
def create_http_session():
session = requests.Session()
session.headers.update(
{
"User-Agent": "AnyConnect Linux_64 4.7.00136",
"Accept": "*/*",
"Accept-Encoding": "identity",
"X-Transcend-Version": "1",
"X-Aggregate-Auth": "1",
"X-AnyConnect-Platform": "linux-64",
"Content-Type": "application/x-www-form-urlencoded",
# I know, it is invalid but that’s what Anyconnect sends
}
)
return session
class AuthenticationState:
def __init__(self, *, authenticator=None, previous=None):
self.authenticator = authenticator
self.auth_request_response = None
self.auth_completed_response = None
self.sso_token = None
if previous:
self.authenticator = previous.authenticator
self.auth_request_response = previous.auth_request_response
self.auth_completed_response = previous.auth_completed_response
self.sso_token = previous.sso_token
def __repr__(self):
return f""
class StartAuthentication(AuthenticationState):
async def trigger(self):
request = _create_auth_init_request(
self.authenticator.host, self.authenticator.host.vpn_url
)
response = self.authenticator.session.post(
self.authenticator.host.vpn_url, request
)
logger.debug("Auth init response received", content=response.content)
response = parse_response(response)
if isinstance(response, AuthRequestResponse):
self.auth_request_response = response
return ExternalAuthentication(previous=self)
else:
logger.error(
"Error occurred during authentication. Invalid response type in state",
state=self,
response=response,
)
return self
E = objectify.ElementMaker(annotate=False)
def _create_auth_init_request(host, url):
ConfigAuth = getattr(E, "config-auth")
Version = E.version
DeviceId = getattr(E, "device-id")
GroupSelect = getattr(E, "group-select")
GroupAccess = getattr(E, "group-access")
Capabilities = E.capabilities
AuthMethod = getattr(E, "auth-method")
root = ConfigAuth(
{"client": "vpn", "type": "init", "aggregate-auth-version": "2"},
Version({"who": "vpn"}, "4.7.00136"),
DeviceId("linux-64"),
GroupSelect(host.name),
GroupAccess(url),
Capabilities(AuthMethod("single-sign-on-v2")),
)
return etree.tostring(
root, pretty_print=True, xml_declaration=True, encoding="UTF-8"
)
def parse_response(resp):
resp.raise_for_status()
xml = objectify.fromstring(resp.content)
t = xml.get("type")
if t == "auth-request":
return parse_auth_request_response(xml)
elif t == "complete":
return parse_auth_complete_response(xml)
def parse_auth_request_response(xml):
assert xml.auth.get("id") == "main"
resp = AuthRequestResponse(
auth_id=xml.auth.get("id"),
auth_title=xml.auth.title,
auth_message=xml.auth.message,
opaque=xml.opaque,
login_url=xml.auth["sso-v2-login"],
login_final_url=xml.auth["sso-v2-login-final"],
token_cookie_name=xml.auth["sso-v2-token-cookie-name"],
)
logger.info(
"Response received",
id=resp.auth_id,
title=resp.auth_title,
message=resp.auth_message,
)
return resp
@attr.s
class AuthRequestResponse:
auth_id = attr.ib(converter=str)
auth_title = attr.ib(converter=str)
auth_message = attr.ib(converter=str)
login_url = attr.ib(converter=str)
login_final_url = attr.ib(convert=str)
token_cookie_name = attr.ib(convert=str)
opaque = attr.ib()
def parse_auth_complete_response(xml):
assert xml.auth.get("id") == "success"
resp = AuthCompleteResponse(
auth_id=xml.auth.get("id"),
auth_message=xml.auth.message,
session_token=xml["session-token"],
server_cert_hash=xml.config["vpn-base-config"]["server-cert-hash"],
)
logger.info("Response received", id=resp.auth_id, message=resp.auth_message)
return resp
@attr.s
class AuthCompleteResponse:
auth_id = attr.ib(converter=str)
auth_message = attr.ib(converter=str)
session_token = attr.ib(converter=str)
server_cert_hash = attr.ib(converter=str)
class ExternalAuthentication(AuthenticationState):
async def trigger(self):
self.sso_token = await authenticate_in_browser(
self.auth_request_response, self.authenticator.credentials
)
return CompleteAuthentication(previous=self)
class CompleteAuthentication(AuthenticationState):
async def trigger(self):
request = _create_auth_finish_request(
self.authenticator.host, self.auth_request_response, self.sso_token
)
response = self.authenticator.session.post(
self.authenticator.host.vpn_url, request
)
logger.debug("Auth finish response received", content=response.content)
response = parse_response(response)
if isinstance(response, AuthCompleteResponse):
self.auth_completed_response = response
return AuthenticationCompleted(previous=self)
else:
logger.error(
"Error occurred during authentication. Invalid response type in state",
state=self,
response=response,
)
return StartAuthentication()
def _create_auth_finish_request(host, auth_info, sso_token):
ConfigAuth = getattr(E, "config-auth")
Version = E.version
DeviceId = getattr(E, "device-id")
SessionToken = getattr(E, "session-token")
SessionId = getattr(E, "session-id")
Auth = E.auth
SsoToken = getattr(E, "sso-token")
root = ConfigAuth(
{"client": "vpn", "type": "auth-reply", "aggregate-auth-version": "2"},
Version({"who": "vpn"}, "4.7.00136"),
DeviceId("linux-64"),
SessionToken(),
SessionId(),
auth_info.opaque,
Auth(SsoToken(sso_token)),
)
return etree.tostring(
root, pretty_print=True, xml_declaration=True, encoding="UTF-8"
)
class AuthenticationCompleted(AuthenticationState):
pass
PK ! ې # openconnect_sso/browser/__init__.pyfrom .browser import Browser
PK ! 7<
" openconnect_sso/browser/browser.pyimport asyncio
import sys
from pathlib import Path
import structlog
from . import rpc_types as rpc
logger = structlog.get_logger()
class Browser:
def __init__(self):
self.browser_proc = None
self.updater = None
self.running = False
self._urls = asyncio.Queue()
self.url = None
self.cookies = {}
self.loop = asyncio.get_event_loop()
async def spawn(self):
exe = sys.executable
script = str(Path(__file__).parent.joinpath(Path("webengine_process.py")))
self.browser_proc = await asyncio.create_subprocess_exec(
exe,
script,
*sys.argv[1:],
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
self.updater = asyncio.ensure_future(self._update_status())
self.running = True
def stop(_task):
self.running = False
asyncio.ensure_future(self.browser_proc.wait()).add_done_callback(stop)
async def _update_status(self):
assert self.running
while self.running:
logger.debug("Waiting for message from browser process")
try:
line = await self.browser_proc.stdout.readline()
state = rpc.deserialize(line)
except EOFError:
if self.running:
logger.warn("Connection terminated with browser")
self.running = False
else:
logger.info("Browser exited")
await self._urls.put(None)
return
logger.debug("Message received from browser", message=state)
if isinstance(state, rpc.Url):
await self._urls.put(state.url)
elif isinstance(state, rpc.SetCookie):
self.cookies[state.name] = state.value
else:
logger.error("Message unrecognized", message=state)
async def authenticate_at(self, url, credentials):
assert self.running
self.browser_proc.stdin.write(rpc.StartupInfo(url, credentials).serialize())
self.browser_proc.stdin.write(b"\n")
await self.browser_proc.stdin.drain()
async def page_loaded(self):
rv = await self._urls.get()
if not self.running:
raise Terminated()
self.url = rv
async def __aenter__(self):
await self.spawn()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
self.running = False
self.browser_proc.terminate()
except ProcessLookupError:
# already stopped
pass
await self.browser_proc.wait()
await self.updater
class Terminated(Exception):
pass
PK ! h$ $ openconnect_sso/browser/rpc_types.pyimport base64
import pickle
import attr
class Type:
def serialize(self):
return base64.b64encode(pickle.dumps(self))
def deserialize(data):
return pickle.loads(base64.b64decode(data))
@attr.s
class Url(Type):
url = attr.ib()
@attr.s
class Credentials(Type):
credentials = attr.ib()
@attr.s
class StartupInfo(Type):
url = attr.ib()
credentials = attr.ib()
@attr.s
class SetCookie(Type):
name = attr.ib()
value = attr.ib()
PK ! }_% % openconnect_sso/browser/user.js// ==UserScript==
// ==/UserScript==
PK ! ^; ; , openconnect_sso/browser/webengine_process.pyimport json
import logging
import signal
import sys
import pkg_resources
import structlog
from PyQt5.QtCore import QUrl
from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineScript
from PyQt5.QtWidgets import QApplication
from openconnect_sso import config
from openconnect_sso.app import configure_logger
from openconnect_sso.browser import rpc_types as rpc
from openconnect_sso.cli import create_argparser
logger = structlog.get_logger("webengine")
def run_browser_process():
args = create_argparser().parse_known_args()[0]
configure_logger(logging.getLogger(), args.log_level)
cfg = config.load()
app = QApplication(sys.argv)
web = WebBrowser(cfg.auto_fill_rules)
line = sys.stdin.buffer.readline()
startup_info = rpc.deserialize(line)
logger.info("Browser started", startup_info=startup_info)
logger.info("Loading page", url=startup_info.url)
web.authenticate_at(QUrl(startup_info.url), startup_info.credentials)
web.show()
rc = app.exec_()
logger.info("Exiting browser")
return rc
class WebBrowser(QWebEngineView):
def __init__(self, auto_fill_rules):
super().__init__()
self._auto_fill_rules = auto_fill_rules
cookie_store = self.page().profile().cookieStore()
cookie_store.cookieAdded.connect(self._on_cookie_added)
self.page().loadFinished.connect(self._on_load_finished)
def authenticate_at(self, url, credentials):
script_source = pkg_resources.resource_string(__name__, "user.js").decode()
script = QWebEngineScript()
script.setInjectionPoint(QWebEngineScript.DocumentCreation)
script.setWorldId(QWebEngineScript.ApplicationWorld)
script.setSourceCode(script_source)
self.page().scripts().insert(script)
if credentials:
logger.info("Initiating autologin", cred=credentials)
for url_pattern, rules in self._auto_fill_rules.items():
script = QWebEngineScript()
script.setInjectionPoint(QWebEngineScript.DocumentReady)
script.setWorldId(QWebEngineScript.ApplicationWorld)
script.setSourceCode(
f"""
// ==UserScript==
// @include {url_pattern}
// ==/UserScript==
function autoFill() {{
{get_selectors(rules, credentials)}
setTimeout(autoFill, 1000);
}}
autoFill();
"""
)
self.page().scripts().insert(script)
self.load(QUrl(url))
def _on_cookie_added(self, cookie):
logger.debug("Cookie set", name=to_str(cookie.name()))
sys.stdout.buffer.write(
rpc.SetCookie(to_str(cookie.name()), to_str(cookie.value())).serialize()
)
sys.stdout.buffer.write(b"\n")
sys.stdout.flush()
def _on_load_finished(self, success):
url = self.page().url().toString()
logger.debug("Page loaded", url=url)
sys.stdout.buffer.write(rpc.Url(url).serialize())
sys.stdout.buffer.write(b"\n")
sys.stdout.flush()
def to_str(qval):
return bytes(qval).decode()
def get_selectors(rules, credentials):
statements = []
for i, rule in enumerate(rules):
selector = json.dumps(rule.selector)
if rule.fill:
value = json.dumps(getattr(credentials, rule.fill, None))
if value:
statements.append(
f"""var elem = document.querySelector({selector}); if (elem) {{ elem.dispatchEvent(new Event("focus")); elem.value = {value}; elem.dispatchEvent(new Event("blur")); }}"""
)
else:
logger.warning(
"Credential info not available",
type=rule.fill,
possibilities=dir(credentials),
)
elif rule.action == "click":
statements.append(
f"""var elem = document.querySelector({selector}); if (elem) {{ elem.dispatchEvent(new Event("focus")); elem.click(); }}"""
)
return "\n".join(statements)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
run_browser_process()
PK ! u# openconnect_sso/cli.pyimport argparse
import enum
import logging
import os
import openconnect_sso
from openconnect_sso import app, config
def create_argparser():
parser = argparse.ArgumentParser(
prog="openconnect-sso", description=openconnect_sso.__description__
)
server_settings = parser.add_argument_group("Server connection")
server_settings.add_argument(
"-p",
"--profile",
dest="profile_path",
help="Use a profile from this file or directory",
)
server_settings.add_argument(
"-P",
"--profile-selector",
dest="use_profile_selector",
help="Always display profile selector",
action="store_true",
default=False,
)
server_settings.add_argument(
"-s",
"--server",
help="VPN server to connect to. The following forms are accepted: "
"vpn.server.com, vpn.server.com/usergroup, "
"https://vpn.server.com, https.vpn.server.com.usergroup",
)
server_settings.add_argument(
"-g",
"--usergroup",
help="Override usergroup setting from --server argument",
default="",
)
parser.add_argument(
"--login-only",
help="Complete authentication but do not acquire a session token or initiate a connection",
action="store_true",
default=False,
)
parser.add_argument(
"-l",
"--log-level",
help="",
type=LogLevel.parse,
choices=LogLevel.choices(),
default=LogLevel.INFO,
)
parser.add_argument(
"openconnect_args",
help="Arguments passed to openconnect",
action=StoreOpenConnectArgs,
nargs=argparse.REMAINDER,
)
credentials_group = parser.add_argument_group("Credentials for automatic login")
credentials_group.add_argument(
"-u", "--user", help="Authenticate as the given user", default=None
)
return parser
class StoreOpenConnectArgs(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if "--" in values:
values.remove("--")
setattr(namespace, self.dest, values)
class LogLevel(enum.IntEnum):
ERROR = logging.ERROR
WARNING = logging.WARNING
INFO = logging.INFO
DEBUG = logging.DEBUG
def __str__(self):
return self.name
@classmethod
def parse(cls, name):
return cls.__members__[name.upper()]
@classmethod
def choices(cls):
return cls.__members__.values()
def main():
parser = create_argparser()
args = parser.parse_args()
if (args.profile_path or args.use_profile_selector) and (
args.server or args.usergroup
):
parser.error(
"--profile/--profile-selector and --server/--usergroup are mutually exclusive"
)
if not args.profile_path and not args.server and not config.load().default_profile:
if os.path.exists("/opt/cisco/anyconnect/profile"):
args.profile_path = "/opt/cisco/anyconnect/profile"
else:
parser.error(
"No Anyconnect profile can be found. One of --profile or --server arguments required."
)
return app.run(args)
PK ! O openconnect_sso/config.pyfrom pathlib import Path
from urllib.parse import urlparse, urlunparse
import attr
import keyring
import structlog
import toml
import xdg.BaseDirectory
logger = structlog.get_logger()
APP_NAME = "openconnect-sso"
def load():
path = xdg.BaseDirectory.load_first_config(APP_NAME)
if not path:
return Config()
config_path = Path(path) / "config.toml"
if not config_path.exists():
return Config()
with config_path.open() as config_file:
try:
return Config.from_dict(toml.load(config_file))
except Exception:
logger.error(
"Could not load configuration file, ignoring",
path=config_path,
exc_info=True,
)
return Config()
def save(config):
path = xdg.BaseDirectory.save_config_path(APP_NAME)
config_path = Path(path) / "config.toml"
config_path.touch()
with config_path.open("w") as config_file:
try:
toml.dump(config.as_dict(), config_file)
except Exception:
logger.error(
"Could not save configuration file", path=config_path, exc_info=True
)
@attr.s
class ConfigNode:
@classmethod
def from_dict(cls, d):
if d is None:
return None
return cls(**d)
def as_dict(self):
return attr.asdict(self)
@attr.s
class HostProfile(ConfigNode):
address = attr.ib(converter=str)
user_group = attr.ib(converter=str)
name = attr.ib(converter=str, default="UNNAMED")
@property
def vpn_url(self):
parts = urlparse(self.address)
group = self.user_group or parts.path
if parts.path == self.address and not self.user_group:
group = ""
return urlunparse(
(parts.scheme or "https", parts.netloc or self.address, group, "", "", "")
)
@attr.s
class AutoFillRule(ConfigNode):
selector = attr.ib()
fill = attr.ib(default=None)
action = attr.ib(default=None)
def get_default_auto_fill_rules():
return {
"https://*": [
AutoFillRule(selector="input[type=email]", fill="username").as_dict(),
AutoFillRule(selector="input[type=password]", fill="password").as_dict(),
AutoFillRule(selector="input[type=submit]", action="click").as_dict(),
]
}
@attr.s
class Credentials(ConfigNode):
username = attr.ib()
@property
def password(self):
return keyring.get_credential(APP_NAME, self.username).password
@password.setter
def password(self, value):
keyring.set_password(APP_NAME, self.username, value)
@attr.s
class Config(ConfigNode):
default_profile = attr.ib(default=None, converter=HostProfile.from_dict)
credentials = attr.ib(default=None, converter=Credentials.from_dict)
auto_fill_rules = attr.ib(
factory=get_default_auto_fill_rules,
converter=lambda rules: {
n: [AutoFillRule.from_dict(r) for r in rule] for n, rule in rules.items()
},
)
PK ! ~ openconnect_sso/profile.pyfrom pathlib import Path
import structlog
from lxml import objectify
from openconnect_sso.config import HostProfile
logger = structlog.get_logger()
ns = {"enc": "http://schemas.xmlsoap.org/encoding/"}
def _get_profiles_from_one_file(path):
logger.info("Loading profiles from file", path=path.name)
with path.open() as f:
xml = objectify.parse(f)
hostentries = xml.xpath(
"//enc:AnyConnectProfile/enc:ServerList/enc:HostEntry", namespaces=ns
)
profiles = []
for entry in hostentries:
profiles.append(
HostProfile(
name=entry.HostName,
address=entry.HostAddress,
user_group=entry.UserGroup,
)
)
logger.debug("Anyconnect profiles parsed", path=path.name, profiles=profiles)
return profiles
def get_profiles(path: Path):
if path.is_file():
profile_files = [path]
elif path.is_dir():
profile_files = path.glob("*.xml")
else:
raise ValueError("No profile file found", path.name)
profiles = []
for p in profile_files:
profiles.extend(_get_profiles_from_one_file(p))
return profiles
PK ! b6 % openconnect_sso/saml_authenticator.pyimport structlog
from openconnect_sso.browser import Browser
log = structlog.get_logger()
async def authenticate_in_browser(auth_info, credentials):
async with Browser() as browser:
await browser.authenticate_at(auth_info.login_url, credentials)
while browser.url != auth_info.login_final_url:
await browser.page_loaded()
log.debug("Browser loaded page", url=browser.url)
return browser.cookies[auth_info.token_cookie_name]
PK !Hw/02 <