Files
mpm-url-shortener/scripts/yourls_manager.py
T

646 lines
23 KiB
Python

#!/usr/bin/env python3
"""
yourls_manager.py — CRUD operations for mpm.to YOURLS instance
Authentication strategy
-----------------------
Create and read always use the signature token via yourls-api.php.
Update and delete try the signature token first (requires the mpm-api-extras
plugin to be active). If the plugin is not installed, they automatically fall
back to an admin session (username + password) via admin-ajax.php.
Credentials are stored in macOS Keychain under service 'mpm.to-yourls':
account 'signature' → static API signature token (required)
account 'username' → YOURLS admin username (optional fallback)
account 'password' → YOURLS admin password (optional fallback)
Usage:
yourls_manager.py setup # Store credentials
yourls_manager.py check # Verify & test
yourls_manager.py create --url URL [--keyword K] [--title T]
yourls_manager.py read --keyword K
yourls_manager.py list [--limit N] [--filter top|bottom|last|rand]
yourls_manager.py update --keyword K [--url URL] [--newkeyword K2] [--title T]
yourls_manager.py delete --keyword K [--force]
"""
import argparse
import getpass
import json
import re
import subprocess
import sys
BASE_URL = "https://mpm.to"
API_URL = f"{BASE_URL}/yourls-api.php"
ADMIN_URL = f"{BASE_URL}/admin"
KEYCHAIN_SERVICE = "mpm.to-yourls"
# ---------------------------------------------------------------------------
# Keychain helpers (macOS)
# ---------------------------------------------------------------------------
def keychain_get(account: str) -> str | None:
result = subprocess.run(
["security", "find-generic-password",
"-a", account, "-s", KEYCHAIN_SERVICE, "-w"],
capture_output=True, text=True
)
return result.stdout.strip() if result.returncode == 0 else None
def keychain_set(account: str, value: str) -> None:
subprocess.run(
["security", "add-generic-password",
"-U", "-a", account, "-s", KEYCHAIN_SERVICE, "-w", value],
capture_output=True, check=True
)
def keychain_delete(account: str) -> None:
subprocess.run(
["security", "delete-generic-password",
"-a", account, "-s", KEYCHAIN_SERVICE],
capture_output=True
)
def get_signature() -> str:
token = keychain_get("signature")
if not token:
sys.exit(
"Error: No signature token found.\n"
"Run `yourls_manager.py setup` to store your credentials."
)
return token
# ---------------------------------------------------------------------------
# Keyword normalisation
# ---------------------------------------------------------------------------
def normalize_keyword(raw: str) -> str:
"""
Apply mpm.to's keyword charset rules client-side so the user sees the
actual short code before the request is sent.
mpm.to is configured with a lowercase alphanumeric charset, so uppercase
letters, hyphens, underscores and all other special characters are silently
stripped by YOURLS. Replicating that here lets us warn the user early.
"""
import re as _re
return _re.sub(r'[^a-z0-9]', '', raw.lower())
def warn_if_sanitized(requested: str, normalized: str) -> None:
if requested != normalized:
print(f" Note: keyword '{requested}' was normalised to '{normalized}'")
print(f" (mpm.to only allows lowercase letters and numbers)")
if not normalized:
sys.exit("Error: After normalisation the keyword is empty. "
"Choose a keyword using only lowercase letters and numbers.")
def get_admin_creds() -> tuple[str, str] | None:
"""Return (username, password) if both are stored, otherwise None."""
username = keychain_get("username")
password = keychain_get("password")
if username and password:
return username, password
return None
# ---------------------------------------------------------------------------
# Standard API calls (always via signature token)
# ---------------------------------------------------------------------------
def _requests():
try:
import requests as req
return req
except ImportError:
sys.exit(
"Error: 'requests' library not installed.\n"
"Fix: pip3 install requests --break-system-packages"
)
def api_call(params: dict) -> dict:
req = _requests()
params["signature"] = get_signature()
params["format"] = "json"
try:
resp = req.post(API_URL, data=params, timeout=15)
resp.raise_for_status()
return resp.json()
except req.exceptions.ConnectionError:
sys.exit(f"Error: Could not connect to {BASE_URL}. Check your network.")
except req.exceptions.Timeout:
sys.exit(f"Error: Request timed out.")
except Exception as e:
sys.exit(f"Error: API call failed — {e}")
def _is_unknown_action(result: dict) -> bool:
"""True when YOURLS says it doesn't know the action — plugin not installed."""
msg = result.get("message", "").lower()
return result.get("errorCode") == "400" and (
"unknown" in msg or "missing" in msg or "action" in msg
)
# ---------------------------------------------------------------------------
# Admin session (fallback for update & delete when plugin is not installed)
# ---------------------------------------------------------------------------
def get_admin_session():
req = _requests()
creds = get_admin_creds()
if not creds:
sys.exit(
"Error: The mpm-api-extras plugin is not active, and no admin\n"
"credentials are stored as a fallback.\n\n"
"Choose one:\n"
" A) Install plugin/mpm-api-extras.php and activate it in YOURLS admin\n"
" B) Run `yourls_manager.py setup` and add admin username + password"
)
username, password = creds
session = req.Session()
try:
# Step 1: GET the admin page to fetch the login nonce.
# YOURLS embeds a time-limited nonce in a hidden <input name="nonce"> on
# the login form; it must be included in the POST or the login is rejected.
# The login endpoint is admin/ (not a separate login.php file).
page = session.get(f"{ADMIN_URL}/", allow_redirects=False, timeout=15)
except Exception as e:
sys.exit(f"Error: Could not reach admin page — {e}")
nonce_m = (
re.search(r'<input[^>]+name=["\']nonce["\'][^>]+value=["\']([^"\']+)["\']', page.text)
or re.search(r'name=["\']nonce["\'][^>]*value=["\']([^"\']+)["\']', page.text)
)
if not nonce_m:
sys.exit("Error: Could not find login nonce on admin page. YOURLS may have changed.")
login_nonce = nonce_m.group(1)
try:
resp = session.post(
f"{ADMIN_URL}/",
data={"username": username, "password": password,
"nonce": login_nonce, "submit": "Login"},
timeout=15,
allow_redirects=True
)
except Exception as e:
sys.exit(f"Error: Admin login request failed — {e}")
if 'name="password"' in resp.text:
sys.exit(
"Error: Admin login failed. Check username and password.\n"
"Re-run `yourls_manager.py setup` to update stored credentials."
)
return session
def get_link_row_info(session, keyword: str) -> dict:
"""
Scrape the admin index page for the nonces and row ID of a given keyword.
Returns dict with keys: id, delete_nonce, edit_nonce
We search the admin page for the specific keyword (?s=keyword) so that
YOURLS filters the table to a single row — essential when there are hundreds
of links and the target keyword would not appear on the first page otherwise.
"""
resp = session.get(
f"{ADMIN_URL}/",
params={"s": keyword, "search_in": "keyword"},
timeout=15,
allow_redirects=True,
)
html = resp.text
# YOURLS embeds nonces in admin-ajax.php hrefs, e.g.:
# admin-ajax.php?id=yid-N&action=delete&keyword=KEYWORD&nonce=NONCE
# Parameter order can vary; try both orderings.
def find_ajax_url(html, action, keyword):
patterns = [
r'href=["\']([^"\']*admin-ajax\.php[^"\']*action=' + re.escape(action)
+ r'[^"\']*keyword=' + re.escape(keyword) + r'[^"\']*)["\']',
r'href=["\']([^"\']*admin-ajax\.php[^"\']*keyword=' + re.escape(keyword)
+ r'[^"\']*action=' + re.escape(action) + r'[^"\']*)["\']',
]
for pat in patterns:
m = re.search(pat, html)
if m:
url = m.group(1).replace("&amp;", "&")
qs = dict(p.split("=", 1) for p in url.split("?", 1)[1].split("&") if "=" in p)
return qs
return None
delete_qs = find_ajax_url(html, "delete", keyword)
edit_qs = find_ajax_url(html, "edit", keyword)
if not delete_qs:
# Diagnose: does the keyword appear at all in the page?
if keyword in html:
sys.exit(
f"Error: Found '{keyword}' on admin page but could not extract "
f"nonce link. The page structure may have changed."
)
else:
sys.exit(
f"Error: Short code '{keyword}' not found on admin page.\n"
f" (admin page URL after redirect: {resp.url})"
)
return {
"id": delete_qs.get("id"),
"delete_nonce": delete_qs.get("nonce"),
"edit_nonce": edit_qs.get("nonce") if edit_qs else None,
}
def admin_delete(keyword: str) -> None:
session = get_admin_session()
info = get_link_row_info(session, keyword)
resp = session.post(
f"{ADMIN_URL}/admin-ajax.php",
data={
"action": "delete",
"keyword": keyword,
"id": info["id"],
"nonce": info["delete_nonce"],
},
timeout=15
)
result = resp.json()
if result.get("success"):
print(f"Deleted: {BASE_URL}/{keyword} (via admin session)")
else:
sys.exit(f"Error deleting '{keyword}': {json.dumps(result)}")
def admin_update(keyword: str, new_url: str, new_keyword: str,
new_title: str, row_id: str, edit_nonce: str) -> None:
session = get_admin_session()
info = get_link_row_info(session, keyword)
# Step 1: fetch the edit row to get the edit-save nonce
disp = session.post(
f"{ADMIN_URL}/admin-ajax.php",
data={
"action": "edit_display",
"keyword": keyword,
"id": info["id"],
"nonce": info["edit_nonce"],
},
timeout=15
)
edit_html = disp.json().get("html", "")
# Extract edit-save nonce from hidden input: id="nonce_<id>"
row_id = info["id"]
save_nonce = None
for pat in [
r'id=["\']nonce_' + re.escape(row_id) + r'["\'][^>]*value=["\']([^"\']+)["\']',
r'value=["\']([^"\']+)["\'][^>]*id=["\']nonce_' + re.escape(row_id) + r'["\']',
]:
m = re.search(pat, edit_html)
if m:
save_nonce = m.group(1)
break
if not save_nonce:
sys.exit("Error: Could not extract edit-save nonce. Try activating the plugin instead.")
# Step 2: save
save = session.post(
f"{ADMIN_URL}/admin-ajax.php",
data={
"action": "edit_save",
"keyword": keyword,
"newkeyword": new_keyword,
"url": new_url,
"title": new_title,
"id": row_id,
"nonce": save_nonce,
},
timeout=15
)
result = save.json()
if result.get("status") == "success":
print(f"Updated: {BASE_URL}/{new_keyword} (via admin session)")
else:
sys.exit(f"Error updating '{keyword}': {result.get('message', json.dumps(result))}")
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_setup(args):
print("=== mpm.to YOURLS Credential Setup ===")
print(f"Stored in macOS Keychain under service '{KEYCHAIN_SERVICE}'\n")
# Signature token (required)
existing_sig = keychain_get("signature")
if existing_sig:
ans = input("Signature token already stored. Replace it? [y/N] ").strip().lower()
if ans == "y":
token = getpass.getpass("New signature token: ").strip()
if token:
keychain_set("signature", token)
print(" ✓ Signature token updated.")
else:
token = getpass.getpass("Signature token (required): ").strip()
if not token:
sys.exit("Error: Signature token is required.")
keychain_set("signature", token)
print(" ✓ Signature token saved.")
# Admin credentials (optional fallback)
print()
print("Admin username + password are optional. They are used as a fallback")
print("for update/delete if the mpm-api-extras plugin is not installed.")
print("Press Enter to skip.\n")
existing_user = keychain_get("username")
prompt = f"Admin username [{existing_user}]: " if existing_user else "Admin username: "
username = input(prompt).strip()
if username:
keychain_set("username", username)
print(" ✓ Username saved.")
password = getpass.getpass("Admin password: ").strip()
if password:
keychain_set("password", password)
print(" ✓ Password saved.")
elif not existing_user:
print(" Skipped — update/delete will require the mpm-api-extras plugin.")
print("\nSetup complete. Run `check` to verify.")
def cmd_check(args):
sig = keychain_get("signature")
user = keychain_get("username")
pw = keychain_get("password")
print(f"Keychain service : {KEYCHAIN_SERVICE}")
print(f" Signature token : {'✓ stored' if sig else '✗ missing — run setup'}")
print(f" Admin username : {'' + user if user else '○ not stored (optional)'}")
print(f" Admin password : {'✓ stored' if pw else '○ not stored (optional)'}")
print()
if sig:
print("Testing API connection...", end=" ", flush=True)
result = api_call({"action": "db-stats"})
if result.get("statusCode") == "200" or result.get("message") == "success":
stats = result.get("db-stats", {})
print(f"OK — {stats.get('total_links','?')} short URLs, "
f"{stats.get('total_clicks','?')} total clicks")
# Check if plugin is active
test = api_call({"action": "delete", "keyword": "__probe__"})
if _is_unknown_action(test):
print(" mpm-api-extras plugin : ✗ not active")
print(" → update/delete will use admin session fallback"
if user and pw else
" → update/delete unavailable without plugin or admin credentials")
else:
print(" mpm-api-extras plugin : ✓ active")
else:
print(f"Failed — {result.get('message', 'unexpected response')}")
else:
print("Cannot test connection — signature token missing.")
def cmd_create(args):
params = {"action": "shorturl", "url": args.url}
if args.keyword:
norm = normalize_keyword(args.keyword)
warn_if_sanitized(args.keyword, norm)
params["keyword"] = norm
if args.title:
params["title"] = args.title
result = api_call(params)
status = result.get("status", "")
if status == "success" or result.get("statusCode") == "200":
short = result.get("shorturl", "")
print(f"Created: {short}")
print(f"{args.url}")
if args.title:
print(f" Title: {args.title}")
elif status == "fail":
msg = result.get("message", "Unknown error")
if "already exists" in msg.lower():
print(f"Short code '{args.keyword}' already exists.")
existing = result.get("shorturl", "")
if existing:
print(f" Existing: {existing}")
else:
sys.exit(f"Error: {msg}")
else:
sys.exit(f"Unexpected response: {json.dumps(result, indent=2)}")
def cmd_read(args):
raw = args.keyword.replace(f"{BASE_URL}/", "").strip("/")
keyword = normalize_keyword(raw)
if raw != keyword:
print(f" Note: looking up '{keyword}' (normalised from '{raw}')")
expand = api_call({"action": "expand", "shorturl": keyword})
if expand.get("errorCode") == "404" or expand.get("message","").lower().startswith("error"):
sys.exit(f"Error: Short code '{keyword}' not found.")
long_url = expand.get("longurl", "")
title = expand.get("title", "")
short = expand.get("shorturl", f"{BASE_URL}/{keyword}")
stats = api_call({"action": "url-stats", "shorturl": keyword})
link = stats.get("link", {})
clicks = link.get("clicks", "?")
created = link.get("timestamp", "")
print(f"Short URL : {short}")
print(f"Long URL : {long_url}")
if title:
print(f"Title : {title}")
print(f"Clicks : {clicks}")
if created:
print(f"Created : {created}")
def cmd_list(args):
limit = args.limit or 50
filter_ = args.filter or "top"
result = api_call({"action": "stats", "filter": filter_, "limit": limit})
if result.get("message") != "success" and result.get("statusCode") != "200":
sys.exit(f"Error: {result.get('message', 'Unknown error')}")
links = list(result.get("links", {}).values())
if not links:
print("No short URLs found.")
return
print(f"{'Short Code':<22} {'Clicks':>7} {'Created':<12} Title / URL")
print("" * 88)
for link in links:
shorturl = link.get("shorturl", "")
keyword = shorturl.replace(BASE_URL + "/", "").strip("/") if shorturl else link.get("keyword", "")
title = link.get("title", "") or link.get("url", "")
clicks = link.get("clicks", 0)
timestamp = link.get("timestamp", "")
date_str = timestamp[:10] if timestamp else ""
label = (title[:48] + "") if len(title) > 48 else title
print(f"{keyword:<22} {clicks:>7} {date_str:<12} {label}")
total = result.get("stats", {}).get("total_links", "?")
print(f"\n{len(links)} of {total} total short URLs (filter: {filter_})")
def cmd_update(args):
if not any([args.url, args.newkeyword, args.title]):
sys.exit("Error: Provide at least one of --url, --newkeyword, or --title.")
raw = args.keyword.replace(f"{BASE_URL}/", "").strip("/")
keyword = normalize_keyword(raw)
if args.newkeyword:
norm_new = normalize_keyword(args.newkeyword)
warn_if_sanitized(args.newkeyword, norm_new)
args.newkeyword = norm_new
# Resolve current values for fields not supplied
expand = api_call({"action": "expand", "shorturl": keyword})
current_url = expand.get("longurl", "")
current_title = expand.get("title", "")
if not current_url:
sys.exit(f"Error: Short code '{keyword}' not found.")
new_url = args.url or current_url
new_keyword = args.newkeyword or keyword
new_title = args.title if args.title is not None else current_title
# Try API (plugin) first
result = api_call({
"action": "update",
"keyword": keyword,
"url": new_url,
"newkeyword": new_keyword,
"title": new_title,
})
if result.get("status") == "success" or result.get("statusCode") == "200":
short = result.get("shorturl", f"{BASE_URL}/{new_keyword}")
print(f"Updated: {short} (via API)")
if args.url: print(f" URL : {new_url}")
if args.newkeyword and args.newkeyword != keyword:
print(f" Code : {keyword}{new_keyword}")
if args.title is not None: print(f" Title : {new_title}")
return
if _is_unknown_action(result):
# Plugin not installed — fall back to admin session
print("Note: mpm-api-extras plugin not active; using admin session fallback.")
admin_update(keyword, new_url, new_keyword, new_title,
row_id=None, edit_nonce=None)
if args.url: print(f" URL : {new_url}")
if args.newkeyword and args.newkeyword != keyword:
print(f" Code : {keyword}{new_keyword}")
if args.title is not None: print(f" Title : {new_title}")
return
sys.exit(f"Error: {result.get('message', json.dumps(result))}")
def cmd_delete(args):
raw = args.keyword.replace(f"{BASE_URL}/", "").strip("/")
keyword = normalize_keyword(raw)
if not args.force:
expand = api_call({"action": "expand", "shorturl": keyword})
long_url = expand.get("longurl", "(not found)")
title = expand.get("title", "")
print(f"About to delete:")
print(f" Short URL : {BASE_URL}/{keyword}")
print(f" Long URL : {long_url}")
if title:
print(f" Title : {title}")
if input("\nAre you sure? [y/N] ").strip().lower() != "y":
print("Cancelled.")
return
# Try API (plugin) first
result = api_call({"action": "delete", "keyword": keyword})
if result.get("status") == "success" or result.get("statusCode") == "200":
print(f"Deleted: {BASE_URL}/{keyword} (via API)")
return
if _is_unknown_action(result):
print("Note: mpm-api-extras plugin not active; using admin session fallback.")
admin_delete(keyword)
return
sys.exit(f"Error: {result.get('message', json.dumps(result))}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="CRUD manager for mpm.to YOURLS short URLs"
)
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("setup", help="Store credentials in macOS Keychain")
sub.add_parser("check", help="Verify credentials and test connection")
p = sub.add_parser("create", help="Create a new short URL")
p.add_argument("--url", required=True)
p.add_argument("--keyword", help="Custom short code (optional)")
p.add_argument("--title", help="Title (optional)")
p = sub.add_parser("read", help="Look up a short URL")
p.add_argument("--keyword", required=True)
p = sub.add_parser("list", help="List short URLs")
p.add_argument("--limit", type=int, default=50)
p.add_argument("--filter", default="top",
choices=["top", "bottom", "last", "rand"])
p = sub.add_parser("update", help="Update a short URL")
p.add_argument("--keyword", required=True)
p.add_argument("--url", help="New long URL")
p.add_argument("--newkeyword", help="New short code")
p.add_argument("--title", help="New title")
p = sub.add_parser("delete", help="Delete a short URL")
p.add_argument("--keyword", required=True)
p.add_argument("--force", action="store_true", help="Skip confirmation")
args = parser.parse_args()
{
"setup": cmd_setup,
"check": cmd_check,
"create": cmd_create,
"read": cmd_read,
"list": cmd_list,
"update": cmd_update,
"delete": cmd_delete,
}[args.command](args)
if __name__ == "__main__":
main()