""" Rundeck API Token Manager Manage Rundeck API tokens (list, create, delete) via the Rundeck REST API v54. USAGE: python manage_rundeck_tokens.py --url [OPTIONS] AUTHENTICATION METHODS (choose one): --token APITOKEN Use an existing Rundeck API token --login USERNAME Use username/password authentication (password will be prompted) --password PASSWORD Password for --login (optional, will be prompted if omitted) --kerberos Use current user's Kerberos ticket (requires: pip install gssapi) COMMANDS: list [--user USERNAME] List all API tokens (optionally filter by user) get --id TOKEN_ID Get details for a specific token by ID create --user USERNAME Create a new API token with roles and optional expiry --roles ROLE [ROLE...] One or more roles (space-separated, required) --duration DURATION Token expiry (e.g., 30d, 12h, 1w; omit for no expiry) delete --id TOKEN_ID Delete a token by its ID EXAMPLES: 1. List all tokens using an API token: python manage_rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN list 2. List all tokens using username/password (password will be prompted): python manage_rundeck_tokens.py --url http://rundeck:4440 --login admin list 3. List tokens for a specific user (admin only): python manage_rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN list --user alice 4. List tokens and send report by email: python manage_rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN list --email 5. Get details for a specific token: python manage_rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN get --id abc123def456 6. Create a new token (non-expiring): python manage_rundeck_tokens.py --url http://rundeck:4440 --login admin create \ --user alice --roles admin deploy 7. Create a token that expires in 30 days: python manage_rundeck_tokens.py --url http://rundeck:4440 --login admin create \ --user john --roles developer --duration 30d 8. Create a token with multiple roles and custom password: python manage_rundeck_tokens.py --url http://rundeck:4440 --login admin --password secret123 create \ --user bob --roles admin deploy execute 9. Delete a token: python manage_rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN delete \ --id abc123def456 10. Use Kerberos authentication (requires valid ticket via kinit): python manage_rundeck_tokens.py --url http://rundeck:4440 --kerberos list 11. Use environment variable for token (useful in scripts): export RUNDECK_TOKEN=MYAPITOKEN python manage_rundeck_tokens.py --url http://rundeck:4440 --token $RUNDECK_TOKEN list OUTPUT: - Tokens are displayed with color-coded expiry status: * [EXPIRED] in red * [N days] in red/yellow (< 7 days) or green (>= 30 days) - Token details include: ID, name, user, roles, expiration date, creator """ import argparse import getpass import json import sys import urllib.request import urllib.error import urllib.parse from http.cookiejar import CookieJar from datetime import datetime, timezone from typing import Optional try: from email_report import EmailReportSender, generate_rundeck_tokens_report HAS_EMAIL = True except ImportError: HAS_EMAIL = False def login_with_password(base_url: str, username: str, password: str) -> urllib.request.OpenerDirector: """ Authenticate with a Rundeck username/password. Returns an opener that carries the session cookie for all subsequent calls. """ cj = CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) login_url = f"{base_url.rstrip('/')}/j_security_check" form_data = urllib.parse.urlencode({ "j_username": username, "j_password": password, }).encode() req = urllib.request.Request(login_url, data=form_data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") try: with opener.open(req) as resp: # Rundeck redirects to / on success and to /user/error on failure final_url = resp.geturl() if "error" in final_url or "login" in final_url: print("[Auth error] Login failed - check username/password.", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"[Connection error] {e.reason}", file=sys.stderr) sys.exit(1) print(f"Logged in as '{username}'.") return opener def login_with_kerberos(base_url: str) -> urllib.request.OpenerDirector: """ Authenticate using SPNEGO/Kerberos (Negotiate scheme). Requires: pip install gssapi + a valid Kerberos ticket (kinit). Probes the server first; exits with a clear message if SPNEGO is not configured on this Rundeck instance. """ try: import base64 import gssapi except ImportError: print("[Error] Kerberos auth requires gssapi: pip install gssapi", file=sys.stderr) sys.exit(1) parsed = urllib.parse.urlparse(base_url) probe_url = f"{base_url.rstrip('/')}/api/54/system/info" # --- Step 1: probe without auth to check for SPNEGO challenge --- cj = CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) www_auth = "" try: probe_req = urllib.request.Request(probe_url, headers={"Accept": "application/json"}) with opener.open(probe_req): return opener # already authenticated (unlikely) except urllib.error.HTTPError as e: www_auth = e.headers.get("WWW-Authenticate", "") if e.code not in (401, 403): body = e.read().decode(errors="replace") print(f"[HTTP {e.code}] {e.reason} - {body}", file=sys.stderr) sys.exit(1) if "Negotiate" not in www_auth: got = www_auth or "none" print( "[Kerberos error] This Rundeck instance does not support SPNEGO/Kerberos.\n" f" No 'WWW-Authenticate: Negotiate' received (got: '{got}').\n" " The server uses form-based auth only. Use --login instead.", file=sys.stderr, ) sys.exit(1) # --- Step 2: generate GSSAPI token --- try: service_name = gssapi.Name( f"HTTP@{parsed.hostname}", name_type=gssapi.NameType.hostbased_service, ) ctx = gssapi.SecurityContext(name=service_name, usage="initiate") token = ctx.step() except gssapi.exceptions.GSSError as e: print(f"[Kerberos error] {e}\nRun: kinit", file=sys.stderr) sys.exit(1) negotiate_token = base64.b64encode(token).decode() # --- Step 3: send authenticated request to obtain session cookie --- req_auth = urllib.request.Request( probe_url, headers={"Authorization": f"Negotiate {negotiate_token}", "Accept": "application/json"}, ) try: with opener.open(req_auth): pass # cookie jar captures the session cookie except urllib.error.HTTPError as e: if e.code == 401: print("[Auth error] Kerberos: token rejected (401). Run: klist", file=sys.stderr) else: body = e.read().decode(errors="replace") print(f"[HTTP {e.code}] {e.reason} - {body}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"[Connection error] {e.reason}", file=sys.stderr) sys.exit(1) import os print(f"Authenticated via Kerberos (cache: {os.environ.get('KRB5CCNAME', 'default')}).") return opener def make_request( base_url: str, method: str, path: str, payload: Optional[dict] = None, api_token: Optional[str] = None, opener: Optional[urllib.request.OpenerDirector] = None, ) -> dict | list | None: """ Perform an API call. Exactly one of *api_token* or *opener* (session cookie) must be provided. """ url = f"{base_url.rstrip('/')}{path}" headers: dict[str, str] = { "Accept": "application/json", "Content-Type": "application/json", } if api_token: headers["X-Rundeck-Auth-Token"] = api_token data = json.dumps(payload).encode() if payload else None req = urllib.request.Request(url, data=data, headers=headers, method=method) _opener = opener or urllib.request.build_opener() try: with _opener.open(req) as resp: body = resp.read() if body: return json.loads(body) return None except urllib.error.HTTPError as e: body = e.read().decode(errors="replace") print(f"[HTTP {e.code}] {e.reason} - {body}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"[Connection error] {e.reason}", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # Shorthand - forwards auth kwargs to make_request # --------------------------------------------------------------------------- def _req(base_url, method, path, payload=None, **auth): return make_request(base_url, method, path, payload, **auth) # --------------------------------------------------------------------------- # API operations # --------------------------------------------------------------------------- def list_tokens(base_url: str, user: Optional[str] = None, **auth) -> list: """List all tokens, or only those belonging to *user*.""" path = "/api/54/tokens" if user: path += f"/{user}" result = _req(base_url, "GET", path, **auth) # Rundeck returns {"tokens": [...]} or a bare list depending on version if isinstance(result, dict): return result.get("tokens", []) return result or [] def get_token(base_url: str, token_id: str, **auth) -> dict: """Get details for a single token by its ID.""" return _req(base_url, "GET", f"/api/54/token/{token_id}", **auth) def create_token( base_url: str, user: str, roles: list[str], duration: Optional[str] = None, name: Optional[str] = None, **auth, ) -> dict: """ Create a new token. Parameters ---------- user : Rundeck username the token is associated with. roles : List of role names (e.g. ["admin", "deploy"]). duration : Optional expiry expressed as a Rundeck duration string, e.g. "30d", "12h", "1w". Omit for a never-expiring token. name : Optional custom name for the token. """ payload: dict = {"user": user, "roles": roles} if duration: payload["duration"] = duration if name: payload["name"] = name return _req(base_url, "POST", "/api/54/tokens", payload, **auth) def delete_token(base_url: str, token_id: str, **auth) -> None: """Delete a token by its ID.""" _req(base_url, "DELETE", f"/api/54/token/{token_id}", **auth) print(f"Token '{token_id}' deleted successfully.") # --------------------------------------------------------------------------- # Pretty-printing # --------------------------------------------------------------------------- # ANSI colour codes _RESET = "\033[0m" _RED = "\033[1;31m" _YELLOW = "\033[1;33m" _GREEN = "\033[1;32m" def _expiry_status(expiration: str) -> tuple[str, str]: """Return (label, colour) based on days until expiry.""" try: exp = datetime.fromisoformat(expiration.replace("Z", "+00:00")) now = datetime.now(timezone.utc) days = (exp - now).days if days < 0: return "[EXPIRED]", _RED elif days < 3: return f"[CRITICAL - expires in {days}d]", _RED elif days < 7: return f"[WARNING - expires in {days}d]", _YELLOW else: return f"[OK - expires in {days}d]", _GREEN except (ValueError, AttributeError): return "", _RESET def print_token(token: dict) -> None: # Determine expiration status for the header colour = _RESET status_label = "" if token.get("expiration"): status_label, colour = _expiry_status(token["expiration"]) elif token.get("expired") is True: status_label, colour = "[EXPIRED]", _RED print(f"{colour}" + "-" * 60 + f" {status_label}{_RESET}") for key in ("id", "name", "user", "roles", "expiration", "expired", "creator", "token"): if key not in token: continue value = token[key] if key == "expiration" and isinstance(value, str) and value: value = value[:10] # Keep only YYYY-MM-DD line = f" {key:<12}: {value}" if key in ("expiration", "expired") and colour != _RESET: print(f"{colour}{line}{_RESET}") else: print(line) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Manage Rundeck API tokens (list / create / delete).", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Authentication - choose one method: --token APITOKEN Use a Rundeck API token --login USER [--password P] Use a username/password (password prompted if omitted) --kerberos Use the current user Kerberos ticket (SPNEGO/Negotiate) Examples: # List all tokens (API token auth) python rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN list # List tokens (login/password auth, password will be prompted) python rundeck_tokens.py --url http://rundeck:4440 --login admin list # List tokens for a specific user python rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN list --user alice # List tokens and send report by email python rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN list --email # Create a token (expires in 30 days) python rundeck_tokens.py --url http://rundeck:4440 --login admin create \\ --user alice --roles admin deploy --duration 30d # Delete a token by ID python rundeck_tokens.py --url http://rundeck:4440 --token MYAPITOKEN delete \\ --id abc123def456 """, ) parser.add_argument( "--url", required=True, help="Rundeck base URL, e.g. http://rundeck:4440", ) auth_group = parser.add_mutually_exclusive_group(required=True) auth_group.add_argument( "--token", metavar="APITOKEN", help="Rundeck API token used to authenticate requests.", ) auth_group.add_argument( "--login", metavar="USERNAME", help="Rundeck username for login/password authentication.", ) auth_group.add_argument( "--kerberos", action="store_true", help="Use the current user Kerberos ticket (SPNEGO). Requires: pip install gssapi", ) parser.add_argument( "--password", metavar="PASSWORD", default=None, help="Password for --login auth. If omitted, you will be prompted (recommended).", ) sub = parser.add_subparsers(dest="command", required=True) # --- list --- p_list = sub.add_parser("list", help="List API tokens.") p_list.add_argument( "--user", default=None, help="Filter tokens by username (optional, admin only for other users).", ) p_list.add_argument( "--email", action="store_true", help="Send the token report by email (requires email_report module and SMTP environment variables).", ) p_list.add_argument( "--report-path", default=None, type=str, help="Save the HTML report to a file (optional, used with --email).", ) # --- get --- p_get = sub.add_parser("get", help="Show details for a single token.") p_get.add_argument("--id", required=True, metavar="TOKEN_ID", help="Token ID.") # --- create --- p_create = sub.add_parser("create", help="Create a new API token.") p_create.add_argument("--user", required=True, help="Username to associate with the token.") p_create.add_argument( "--roles", nargs="+", required=True, metavar="ROLE", help="One or more roles to assign (space-separated).", ) p_create.add_argument( "--duration", default=None, help="Expiry duration, e.g. 30d, 12h, 1w. Omit for no expiry.", ) p_create.add_argument( "--name", default=None, help="Optional custom name for the token.", ) # --- delete --- p_delete = sub.add_parser("delete", help="Delete an API token.") p_delete.add_argument("--id", required=True, metavar="TOKEN_ID", help="Token ID to delete.") return parser def main() -> None: parser = build_parser() args = parser.parse_args() # --- Build auth kwargs passed to every API call --- if args.token: auth = {"api_token": args.token} elif args.kerberos: session_opener = login_with_kerberos(args.url) auth = {"opener": session_opener} else: password = args.password or getpass.getpass(f"Password for '{args.login}': ") session_opener = login_with_password(args.url, args.login, password) auth = {"opener": session_opener} if args.command == "list": # For list, the optional --user filter lives on the subcommand tokens = list_tokens(args.url, getattr(args, "user", None), **auth) if not tokens: print("No tokens found.") else: print(f"Found {len(tokens)} token(s):") for t in tokens: print_token(t) # Send email report if requested if getattr(args, "email", False): if not HAS_EMAIL: print("\n? [Error] Email module not available. Please check that email_report.py is in the same directory.", file=sys.stderr) sys.exit(1) print("\n?? Generating email report...") # Generate HTML report report_html = generate_rundeck_tokens_report( tokens=tokens, base_url=args.url ) # Save to file if requested if getattr(args, "report_path", None): try: with open(args.report_path, 'w', encoding='utf-8') as f: f.write(report_html) print(f"? Report saved to: {args.report_path}") except Exception as e: print(f"? Error saving report: {e}", file=sys.stderr) sys.exit(1) # Send email try: sender = EmailReportSender() subject = f"Rundeck API Tokens Report - {datetime.now().strftime('%d/%m/%Y')}" success = sender.send_simple_report( subject=subject, html_body=report_html ) if not success: print("[ERROR] Failed to send email", file=sys.stderr) sys.exit(1) except Exception as e: print(f"[ERROR] Error sending email: {e}", file=sys.stderr) sys.exit(1) elif args.command == "get": token = get_token(args.url, args.id, **auth) print_token(token) elif args.command == "create": new_token = create_token( args.url, args.user, args.roles, args.duration, args.name, **auth ) print("Token created:") print_token(new_token) elif args.command == "delete": delete_token(args.url, args.id, **auth) if __name__ == "__main__": main()