#!/usr/bin/env python3 """ yourls_manager.py — CRUD operations for mpm.to YOURLS instance Authentication strategy ----------------------- Create and read always use the signature token via yourls-api.php. Update and delete try the signature token first (requires the mpm-api-extras plugin to be active). If the plugin is not installed, they automatically fall back to an admin session (username + password) via admin-ajax.php. Credentials are stored in macOS Keychain under service 'mpm.to-yourls': account 'signature' → static API signature token (required) account 'username' → YOURLS admin username (optional fallback) account 'password' → YOURLS admin password (optional fallback) Usage: yourls_manager.py setup # Store credentials yourls_manager.py check # Verify & test yourls_manager.py create --url URL [--keyword K] [--title T] yourls_manager.py read --keyword K yourls_manager.py list [--limit N] [--filter top|bottom|last|rand] yourls_manager.py update --keyword K [--url URL] [--newkeyword K2] [--title T] yourls_manager.py delete --keyword K [--force] """ import argparse import getpass import json import re import subprocess import sys BASE_URL = "https://mpm.to" API_URL = f"{BASE_URL}/yourls-api.php" ADMIN_URL = f"{BASE_URL}/admin" KEYCHAIN_SERVICE = "mpm.to-yourls" # --------------------------------------------------------------------------- # Keychain helpers (macOS) # --------------------------------------------------------------------------- def keychain_get(account: str) -> str | None: result = subprocess.run( ["security", "find-generic-password", "-a", account, "-s", KEYCHAIN_SERVICE, "-w"], capture_output=True, text=True ) return result.stdout.strip() if result.returncode == 0 else None def keychain_set(account: str, value: str) -> None: subprocess.run( ["security", "add-generic-password", "-U", "-a", account, "-s", KEYCHAIN_SERVICE, "-w", value], capture_output=True, check=True ) def keychain_delete(account: str) -> None: subprocess.run( ["security", "delete-generic-password", "-a", account, "-s", KEYCHAIN_SERVICE], capture_output=True ) def get_signature() -> str: token = keychain_get("signature") if not token: sys.exit( "Error: No signature token found.\n" "Run `yourls_manager.py setup` to store your credentials." ) return token # --------------------------------------------------------------------------- # Keyword normalisation # --------------------------------------------------------------------------- def normalize_keyword(raw: str) -> str: """ Apply mpm.to's keyword charset rules client-side so the user sees the actual short code before the request is sent. mpm.to is configured with a lowercase alphanumeric charset, so uppercase letters, hyphens, underscores and all other special characters are silently stripped by YOURLS. Replicating that here lets us warn the user early. """ import re as _re return _re.sub(r'[^a-z0-9]', '', raw.lower()) def warn_if_sanitized(requested: str, normalized: str) -> None: if requested != normalized: print(f" Note: keyword '{requested}' was normalised to '{normalized}'") print(f" (mpm.to only allows lowercase letters and numbers)") if not normalized: sys.exit("Error: After normalisation the keyword is empty. " "Choose a keyword using only lowercase letters and numbers.") def get_admin_creds() -> tuple[str, str] | None: """Return (username, password) if both are stored, otherwise None.""" username = keychain_get("username") password = keychain_get("password") if username and password: return username, password return None # --------------------------------------------------------------------------- # Standard API calls (always via signature token) # --------------------------------------------------------------------------- def _requests(): try: import requests as req return req except ImportError: sys.exit( "Error: 'requests' library not installed.\n" "Fix: pip3 install requests --break-system-packages" ) def api_call(params: dict) -> dict: req = _requests() params["signature"] = get_signature() params["format"] = "json" try: resp = req.post(API_URL, data=params, timeout=15) resp.raise_for_status() return resp.json() except req.exceptions.ConnectionError: sys.exit(f"Error: Could not connect to {BASE_URL}. Check your network.") except req.exceptions.Timeout: sys.exit(f"Error: Request timed out.") except Exception as e: sys.exit(f"Error: API call failed — {e}") def _is_unknown_action(result: dict) -> bool: """True when YOURLS says it doesn't know the action — plugin not installed.""" msg = result.get("message", "").lower() return result.get("errorCode") == "400" and ( "unknown" in msg or "missing" in msg or "action" in msg ) # --------------------------------------------------------------------------- # Admin session (fallback for update & delete when plugin is not installed) # --------------------------------------------------------------------------- def get_admin_session(): req = _requests() creds = get_admin_creds() if not creds: sys.exit( "Error: The mpm-api-extras plugin is not active, and no admin\n" "credentials are stored as a fallback.\n\n" "Choose one:\n" " A) Install plugin/mpm-api-extras.php and activate it in YOURLS admin\n" " B) Run `yourls_manager.py setup` and add admin username + password" ) username, password = creds session = req.Session() try: # Step 1: GET the admin page to fetch the login nonce. # YOURLS embeds a time-limited nonce in a hidden on # the login form; it must be included in the POST or the login is rejected. # The login endpoint is admin/ (not a separate login.php file). page = session.get(f"{ADMIN_URL}/", allow_redirects=False, timeout=15) except Exception as e: sys.exit(f"Error: Could not reach admin page — {e}") nonce_m = ( re.search(r']+name=["\']nonce["\'][^>]+value=["\']([^"\']+)["\']', page.text) or re.search(r'name=["\']nonce["\'][^>]*value=["\']([^"\']+)["\']', page.text) ) if not nonce_m: sys.exit("Error: Could not find login nonce on admin page. YOURLS may have changed.") login_nonce = nonce_m.group(1) try: resp = session.post( f"{ADMIN_URL}/", data={"username": username, "password": password, "nonce": login_nonce, "submit": "Login"}, timeout=15, allow_redirects=True ) except Exception as e: sys.exit(f"Error: Admin login request failed — {e}") if 'name="password"' in resp.text: sys.exit( "Error: Admin login failed. Check username and password.\n" "Re-run `yourls_manager.py setup` to update stored credentials." ) return session def get_link_row_info(session, keyword: str) -> dict: """ Scrape the admin index page for the nonces and row ID of a given keyword. Returns dict with keys: id, delete_nonce, edit_nonce We search the admin page for the specific keyword (?s=keyword) so that YOURLS filters the table to a single row — essential when there are hundreds of links and the target keyword would not appear on the first page otherwise. """ resp = session.get( f"{ADMIN_URL}/", params={"s": keyword, "search_in": "keyword"}, timeout=15, allow_redirects=True, ) html = resp.text # YOURLS embeds nonces in admin-ajax.php hrefs, e.g.: # admin-ajax.php?id=yid-N&action=delete&keyword=KEYWORD&nonce=NONCE # Parameter order can vary; try both orderings. def find_ajax_url(html, action, keyword): patterns = [ r'href=["\']([^"\']*admin-ajax\.php[^"\']*action=' + re.escape(action) + r'[^"\']*keyword=' + re.escape(keyword) + r'[^"\']*)["\']', r'href=["\']([^"\']*admin-ajax\.php[^"\']*keyword=' + re.escape(keyword) + r'[^"\']*action=' + re.escape(action) + r'[^"\']*)["\']', ] for pat in patterns: m = re.search(pat, html) if m: url = m.group(1).replace("&", "&") qs = dict(p.split("=", 1) for p in url.split("?", 1)[1].split("&") if "=" in p) return qs return None delete_qs = find_ajax_url(html, "delete", keyword) edit_qs = find_ajax_url(html, "edit", keyword) if not delete_qs: # Diagnose: does the keyword appear at all in the page? if keyword in html: sys.exit( f"Error: Found '{keyword}' on admin page but could not extract " f"nonce link. The page structure may have changed." ) else: sys.exit( f"Error: Short code '{keyword}' not found on admin page.\n" f" (admin page URL after redirect: {resp.url})" ) return { "id": delete_qs.get("id"), "delete_nonce": delete_qs.get("nonce"), "edit_nonce": edit_qs.get("nonce") if edit_qs else None, } def admin_delete(keyword: str) -> None: session = get_admin_session() info = get_link_row_info(session, keyword) resp = session.post( f"{ADMIN_URL}/admin-ajax.php", data={ "action": "delete", "keyword": keyword, "id": info["id"], "nonce": info["delete_nonce"], }, timeout=15 ) result = resp.json() if result.get("success"): print(f"Deleted: {BASE_URL}/{keyword} (via admin session)") else: sys.exit(f"Error deleting '{keyword}': {json.dumps(result)}") def admin_update(keyword: str, new_url: str, new_keyword: str, new_title: str, row_id: str, edit_nonce: str) -> None: session = get_admin_session() info = get_link_row_info(session, keyword) # Step 1: fetch the edit row to get the edit-save nonce disp = session.post( f"{ADMIN_URL}/admin-ajax.php", data={ "action": "edit_display", "keyword": keyword, "id": info["id"], "nonce": info["edit_nonce"], }, timeout=15 ) edit_html = disp.json().get("html", "") # Extract edit-save nonce from hidden input: id="nonce_" row_id = info["id"] save_nonce = None for pat in [ r'id=["\']nonce_' + re.escape(row_id) + r'["\'][^>]*value=["\']([^"\']+)["\']', r'value=["\']([^"\']+)["\'][^>]*id=["\']nonce_' + re.escape(row_id) + r'["\']', ]: m = re.search(pat, edit_html) if m: save_nonce = m.group(1) break if not save_nonce: sys.exit("Error: Could not extract edit-save nonce. Try activating the plugin instead.") # Step 2: save save = session.post( f"{ADMIN_URL}/admin-ajax.php", data={ "action": "edit_save", "keyword": keyword, "newkeyword": new_keyword, "url": new_url, "title": new_title, "id": row_id, "nonce": save_nonce, }, timeout=15 ) result = save.json() if result.get("status") == "success": print(f"Updated: {BASE_URL}/{new_keyword} (via admin session)") else: sys.exit(f"Error updating '{keyword}': {result.get('message', json.dumps(result))}") # --------------------------------------------------------------------------- # Commands # --------------------------------------------------------------------------- def cmd_setup(args): print("=== mpm.to YOURLS Credential Setup ===") print(f"Stored in macOS Keychain under service '{KEYCHAIN_SERVICE}'\n") # Signature token (required) existing_sig = keychain_get("signature") if existing_sig: ans = input("Signature token already stored. Replace it? [y/N] ").strip().lower() if ans == "y": token = getpass.getpass("New signature token: ").strip() if token: keychain_set("signature", token) print(" ✓ Signature token updated.") else: token = getpass.getpass("Signature token (required): ").strip() if not token: sys.exit("Error: Signature token is required.") keychain_set("signature", token) print(" ✓ Signature token saved.") # Admin credentials (optional fallback) print() print("Admin username + password are optional. They are used as a fallback") print("for update/delete if the mpm-api-extras plugin is not installed.") print("Press Enter to skip.\n") existing_user = keychain_get("username") prompt = f"Admin username [{existing_user}]: " if existing_user else "Admin username: " username = input(prompt).strip() if username: keychain_set("username", username) print(" ✓ Username saved.") password = getpass.getpass("Admin password: ").strip() if password: keychain_set("password", password) print(" ✓ Password saved.") elif not existing_user: print(" Skipped — update/delete will require the mpm-api-extras plugin.") print("\nSetup complete. Run `check` to verify.") def cmd_check(args): sig = keychain_get("signature") user = keychain_get("username") pw = keychain_get("password") print(f"Keychain service : {KEYCHAIN_SERVICE}") print(f" Signature token : {'✓ stored' if sig else '✗ missing — run setup'}") print(f" Admin username : {'✓ ' + user if user else '○ not stored (optional)'}") print(f" Admin password : {'✓ stored' if pw else '○ not stored (optional)'}") print() if sig: print("Testing API connection...", end=" ", flush=True) result = api_call({"action": "db-stats"}) if result.get("statusCode") == "200" or result.get("message") == "success": stats = result.get("db-stats", {}) print(f"OK — {stats.get('total_links','?')} short URLs, " f"{stats.get('total_clicks','?')} total clicks") # Check if plugin is active test = api_call({"action": "delete", "keyword": "__probe__"}) if _is_unknown_action(test): print(" mpm-api-extras plugin : ✗ not active") print(" → update/delete will use admin session fallback" if user and pw else " → update/delete unavailable without plugin or admin credentials") else: print(" mpm-api-extras plugin : ✓ active") else: print(f"Failed — {result.get('message', 'unexpected response')}") else: print("Cannot test connection — signature token missing.") def cmd_create(args): params = {"action": "shorturl", "url": args.url} if args.keyword: norm = normalize_keyword(args.keyword) warn_if_sanitized(args.keyword, norm) params["keyword"] = norm if args.title: params["title"] = args.title result = api_call(params) status = result.get("status", "") if status == "success" or result.get("statusCode") == "200": short = result.get("shorturl", "") print(f"Created: {short}") print(f" → {args.url}") if args.title: print(f" Title: {args.title}") elif status == "fail": msg = result.get("message", "Unknown error") if "already exists" in msg.lower(): print(f"Short code '{args.keyword}' already exists.") existing = result.get("shorturl", "") if existing: print(f" Existing: {existing}") else: sys.exit(f"Error: {msg}") else: sys.exit(f"Unexpected response: {json.dumps(result, indent=2)}") def cmd_read(args): raw = args.keyword.replace(f"{BASE_URL}/", "").strip("/") keyword = normalize_keyword(raw) if raw != keyword: print(f" Note: looking up '{keyword}' (normalised from '{raw}')") expand = api_call({"action": "expand", "shorturl": keyword}) if expand.get("errorCode") == "404" or expand.get("message","").lower().startswith("error"): sys.exit(f"Error: Short code '{keyword}' not found.") long_url = expand.get("longurl", "") title = expand.get("title", "") short = expand.get("shorturl", f"{BASE_URL}/{keyword}") stats = api_call({"action": "url-stats", "shorturl": keyword}) link = stats.get("link", {}) clicks = link.get("clicks", "?") created = link.get("timestamp", "") print(f"Short URL : {short}") print(f"Long URL : {long_url}") if title: print(f"Title : {title}") print(f"Clicks : {clicks}") if created: print(f"Created : {created}") def cmd_list(args): limit = args.limit or 50 filter_ = args.filter or "top" result = api_call({"action": "stats", "filter": filter_, "limit": limit}) if result.get("message") != "success" and result.get("statusCode") != "200": sys.exit(f"Error: {result.get('message', 'Unknown error')}") links = list(result.get("links", {}).values()) if not links: print("No short URLs found.") return print(f"{'Short Code':<22} {'Clicks':>7} {'Created':<12} Title / URL") print("─" * 88) for link in links: shorturl = link.get("shorturl", "") keyword = shorturl.replace(BASE_URL + "/", "").strip("/") if shorturl else link.get("keyword", "") title = link.get("title", "") or link.get("url", "") clicks = link.get("clicks", 0) timestamp = link.get("timestamp", "") date_str = timestamp[:10] if timestamp else "" label = (title[:48] + "…") if len(title) > 48 else title print(f"{keyword:<22} {clicks:>7} {date_str:<12} {label}") total = result.get("stats", {}).get("total_links", "?") print(f"\n{len(links)} of {total} total short URLs (filter: {filter_})") def cmd_update(args): if not any([args.url, args.newkeyword, args.title]): sys.exit("Error: Provide at least one of --url, --newkeyword, or --title.") raw = args.keyword.replace(f"{BASE_URL}/", "").strip("/") keyword = normalize_keyword(raw) if args.newkeyword: norm_new = normalize_keyword(args.newkeyword) warn_if_sanitized(args.newkeyword, norm_new) args.newkeyword = norm_new # Resolve current values for fields not supplied expand = api_call({"action": "expand", "shorturl": keyword}) current_url = expand.get("longurl", "") current_title = expand.get("title", "") if not current_url: sys.exit(f"Error: Short code '{keyword}' not found.") new_url = args.url or current_url new_keyword = args.newkeyword or keyword new_title = args.title if args.title is not None else current_title # Try API (plugin) first result = api_call({ "action": "update", "keyword": keyword, "url": new_url, "newkeyword": new_keyword, "title": new_title, }) if result.get("status") == "success" or result.get("statusCode") == "200": short = result.get("shorturl", f"{BASE_URL}/{new_keyword}") print(f"Updated: {short} (via API)") if args.url: print(f" URL : {new_url}") if args.newkeyword and args.newkeyword != keyword: print(f" Code : {keyword} → {new_keyword}") if args.title is not None: print(f" Title : {new_title}") return if _is_unknown_action(result): # Plugin not installed — fall back to admin session print("Note: mpm-api-extras plugin not active; using admin session fallback.") admin_update(keyword, new_url, new_keyword, new_title, row_id=None, edit_nonce=None) if args.url: print(f" URL : {new_url}") if args.newkeyword and args.newkeyword != keyword: print(f" Code : {keyword} → {new_keyword}") if args.title is not None: print(f" Title : {new_title}") return sys.exit(f"Error: {result.get('message', json.dumps(result))}") def cmd_delete(args): raw = args.keyword.replace(f"{BASE_URL}/", "").strip("/") keyword = normalize_keyword(raw) if not args.force: expand = api_call({"action": "expand", "shorturl": keyword}) long_url = expand.get("longurl", "(not found)") title = expand.get("title", "") print(f"About to delete:") print(f" Short URL : {BASE_URL}/{keyword}") print(f" Long URL : {long_url}") if title: print(f" Title : {title}") if input("\nAre you sure? [y/N] ").strip().lower() != "y": print("Cancelled.") return # Try API (plugin) first result = api_call({"action": "delete", "keyword": keyword}) if result.get("status") == "success" or result.get("statusCode") == "200": print(f"Deleted: {BASE_URL}/{keyword} (via API)") return if _is_unknown_action(result): print("Note: mpm-api-extras plugin not active; using admin session fallback.") admin_delete(keyword) return sys.exit(f"Error: {result.get('message', json.dumps(result))}") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="CRUD manager for mpm.to YOURLS short URLs" ) sub = parser.add_subparsers(dest="command", required=True) sub.add_parser("setup", help="Store credentials in macOS Keychain") sub.add_parser("check", help="Verify credentials and test connection") p = sub.add_parser("create", help="Create a new short URL") p.add_argument("--url", required=True) p.add_argument("--keyword", help="Custom short code (optional)") p.add_argument("--title", help="Title (optional)") p = sub.add_parser("read", help="Look up a short URL") p.add_argument("--keyword", required=True) p = sub.add_parser("list", help="List short URLs") p.add_argument("--limit", type=int, default=50) p.add_argument("--filter", default="top", choices=["top", "bottom", "last", "rand"]) p = sub.add_parser("update", help="Update a short URL") p.add_argument("--keyword", required=True) p.add_argument("--url", help="New long URL") p.add_argument("--newkeyword", help="New short code") p.add_argument("--title", help="New title") p = sub.add_parser("delete", help="Delete a short URL") p.add_argument("--keyword", required=True) p.add_argument("--force", action="store_true", help="Skip confirmation") args = parser.parse_args() { "setup": cmd_setup, "check": cmd_check, "create": cmd_create, "read": cmd_read, "list": cmd_list, "update": cmd_update, "delete": cmd_delete, }[args.command](args) if __name__ == "__main__": main()