#!/usr/bin/env python3 import argparse import asyncio import json import ssl import sys from components.logs import logger from config import defaults parser = argparse.ArgumentParser( description="EHLO CTRL", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument( "-H", "--host", dest="hostname", type=str, default="localhost", help="Override localhost", ) parser.add_argument( "-t", "--confirm_token", action="store_true", help="Generate a token when prompted by the application", ) parser.add_argument( "-p", "--promote-user", dest="promote_user", type=str, help="Promote a user to system administrator", ) ctrl_parameters = vars(parser.parse_args()) if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.load_cert_chain(certfile=defaults.TLS_CERTFILE, keyfile=defaults.TLS_KEYFILE) context.load_verify_locations(cafile=defaults.TLS_CA) context.check_hostname = False context.verify_mode = ssl.VerifyMode.CERT_REQUIRED context.minimum_version = ssl.TLSVersion.TLSv1_3 async def main(): match ctrl_parameters: case { "promote_user": promote_user, } if isinstance(promote_user, str): selection = 0 r, w = await asyncio.open_connection( ctrl_parameters["hostname"], 2102, ssl=context ) w.write(b"\x97") w.write(f"{promote_user}\n".encode("utf-8")) await w.drain() data = await r.readexactly(1) if data == b"\x01": logger.info("User was promoted") elif data == b"\x02": logger.warning("User is already administrator") elif data == b"\x03": logger.error("User does not exist or an error occured") case { "confirm_token": True, }: try: selection = 0 r, w = await asyncio.open_connection( ctrl_parameters["hostname"], 2102, ssl=context ) w.write(b"\x98") await w.drain() data = await r.readuntil(b"\n") requests = json.loads(data.strip().decode("ascii")) if not requests: logger.warning("No request is awaiting confirmation.") else: logger.info("\x1b[1mPlease select a token to confirm:\x1b[0m\n") for idx, (code, intention) in requests.items(): logger.info(f"\x1b[1m#{idx}\x1b[0m - {code}: {intention}") while selection not in requests.keys(): try: selection = str(input("Enter a token #: ")) except ValueError: continue w.write(b"\x99") w.write(f"{requests[selection][0]}".encode("ascii")) await w.drain() data = await r.readexactly(6) logger.info( "\nConfirmation code: \x1b[1;32m" + data.strip().decode("ascii") + "\x1b[0m\n" ) except asyncio.exceptions.IncompleteReadError as e: logger.error(f"Server error: {e}") finally: w.close() await w.wait_closed() if __name__ == "__main__": asyncio.run(main())