1766 lines
78 KiB
Python
1766 lines
78 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Odoo MCP Server for MPM — v0.3.0
|
|
Connects to mpmedia.odoo.com via XML-RPC and exposes tools for
|
|
Products, Knowledge, Contacts, Sales, CRM, Project, Helpdesk,
|
|
Purchase, Inventory, Employees, Knowledge Templates, and eLearning.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import base64
|
|
import xmlrpc.client
|
|
import urllib.request
|
|
import urllib.error
|
|
from typing import Optional
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
# ── Configuration ────────────────────────────────────────────────────────────
|
|
ODOO_URL = os.environ.get("ODOO_URL", "https://mpmedia.odoo.com")
|
|
ODOO_DB = os.environ.get("ODOO_DB", "mpmedia-odoo-sh-main-13285275")
|
|
ODOO_USERNAME = os.environ.get("ODOO_USERNAME", "") # per-user: set via Keychain
|
|
KEYCHAIN_SERVICE = "odoo-mpm" # credential store service name (all platforms)
|
|
ODOO_API_KEY = os.environ.get("ODOO_API_KEY", "")
|
|
|
|
# ── Proxy-aware XML-RPC transport ─────────────────────────────────────────────
|
|
class ProxyAwareTransport(xmlrpc.client.SafeTransport):
|
|
"""Routes xmlrpc through the system HTTPS proxy (respects HTTPS_PROXY env var)."""
|
|
def request(self, host, handler, request_body, verbose=False):
|
|
url = f"https://{host}{handler}"
|
|
headers = {
|
|
"Content-Type": "text/xml",
|
|
"Accept-Encoding": "identity",
|
|
"User-Agent": "xmlrpc-odoo-mpm/1.0",
|
|
}
|
|
req = urllib.request.Request(url, request_body, headers)
|
|
opener = urllib.request.build_opener(urllib.request.ProxyHandler())
|
|
try:
|
|
with opener.open(req, timeout=30) as resp:
|
|
self.verbose = verbose
|
|
return self.parse_response(resp)
|
|
except urllib.error.HTTPError as e:
|
|
raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers))
|
|
except urllib.error.URLError as e:
|
|
raise xmlrpc.client.ProtocolError(url, 0, str(e.reason), {})
|
|
|
|
_proxy_transport = ProxyAwareTransport()
|
|
|
|
# ── Odoo client ───────────────────────────────────────────────────────────────
|
|
_uid: Optional[int] = None
|
|
_models = None
|
|
_resolved_api_key: Optional[str] = None
|
|
|
|
def _keychain_get(account: str) -> str:
|
|
try:
|
|
import keyring
|
|
value = keyring.get_password(KEYCHAIN_SERVICE, account)
|
|
return value or ""
|
|
except Exception:
|
|
return ""
|
|
|
|
def _keychain_set(account: str, value: str) -> None:
|
|
import keyring
|
|
keyring.set_password(KEYCHAIN_SERVICE, account, value)
|
|
|
|
def _keychain_delete(account: str) -> bool:
|
|
try:
|
|
import keyring
|
|
existing = keyring.get_password(KEYCHAIN_SERVICE, account)
|
|
if existing is not None:
|
|
keyring.delete_password(KEYCHAIN_SERVICE, account)
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def _get_credentials() -> tuple[str, str]:
|
|
username = ODOO_USERNAME or _keychain_get("odoo_username")
|
|
api_key = ODOO_API_KEY or _keychain_get("odoo_api_key")
|
|
return username, api_key
|
|
|
|
def _connect():
|
|
global _uid, _models, _resolved_api_key
|
|
if _uid is not None:
|
|
return
|
|
username, api_key = _get_credentials()
|
|
if not username or not api_key:
|
|
raise RuntimeError(
|
|
"Odoo credentials not configured. "
|
|
"Run the setup_odoo_credentials tool with your Odoo login email and API key. "
|
|
"See the Odoo skill instructions for how to generate your personal API key."
|
|
)
|
|
common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common", transport=_proxy_transport)
|
|
_uid = common.authenticate(ODOO_DB, username, api_key, {})
|
|
if not _uid:
|
|
raise RuntimeError(
|
|
"Odoo authentication failed. "
|
|
"Verify your username and API key, then run setup_odoo_credentials again."
|
|
)
|
|
_resolved_api_key = api_key
|
|
_models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object", transport=_proxy_transport)
|
|
|
|
def _call(model: str, method: str, args=None, kwargs=None):
|
|
_connect()
|
|
return _models.execute_kw(
|
|
ODOO_DB, _uid, _resolved_api_key,
|
|
model, method,
|
|
args or [[]],
|
|
kwargs or {}
|
|
)
|
|
|
|
def _search_read(model, domain=None, fields=None, limit=20, offset=0, order=None):
|
|
kw = {"limit": limit, "offset": offset}
|
|
if fields:
|
|
kw["fields"] = fields
|
|
if order:
|
|
kw["order"] = order
|
|
return _call(model, "search_read", [domain or []], kw)
|
|
|
|
def _read(model, ids, fields=None):
|
|
kw = {}
|
|
if fields:
|
|
kw["fields"] = fields
|
|
return _call(model, "read", [ids], kw)
|
|
|
|
def _create(model, vals):
|
|
return _call(model, "create", [vals])
|
|
|
|
def _write(model, ids, vals):
|
|
return _call(model, "write", [[ids] if isinstance(ids, int) else ids, vals])
|
|
|
|
def _fetch_image_base64(url: str) -> str:
|
|
"""Fetch an image from a URL and return a base64-encoded string."""
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "odoo-mpm/1.0"})
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return base64.b64encode(resp.read()).decode("utf-8")
|
|
except Exception as e:
|
|
raise Exception(f"[image fetch] Could not fetch image from {url}: {e}")
|
|
|
|
def _resolve_partner_ids(emails: list) -> tuple:
|
|
"""Resolve a list of email addresses to partner IDs.
|
|
Returns (found_ids, not_found_emails)."""
|
|
if not emails:
|
|
return [], []
|
|
lower_emails = [e.lower() for e in emails]
|
|
results = _search_read("res.partner",
|
|
[["email", "in", lower_emails]],
|
|
["id", "email"], limit=len(emails) + 10)
|
|
found_map = {r["email"].lower(): r["id"] for r in results if r.get("email")}
|
|
found_ids = [found_map[e] for e in lower_emails if e in found_map]
|
|
not_found = [e for e in lower_emails if e not in found_map]
|
|
return found_ids, not_found
|
|
|
|
def _resolve_group_ids(group_names: list) -> list:
|
|
"""Resolve Odoo group names to IDs (case-insensitive exact match)."""
|
|
if not group_names:
|
|
return []
|
|
group_ids = []
|
|
for gname in group_names:
|
|
groups = _search_read("res.groups", [["name", "ilike", gname]],
|
|
["id", "name"], limit=10)
|
|
exact = [g for g in groups if g["name"].strip().lower() == gname.strip().lower()]
|
|
if exact:
|
|
group_ids.append(exact[0]["id"])
|
|
elif groups:
|
|
group_ids.append(groups[0]["id"])
|
|
return group_ids
|
|
|
|
# ── FastMCP App ───────────────────────────────────────────────────────────────
|
|
mcp = FastMCP("Odoo MPM")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# CREDENTIALS SETUP
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def setup_odoo_credentials(username: str, api_key: str) -> str:
|
|
"""Store your personal Odoo credentials securely in the system keystore.
|
|
This only needs to be done once per machine (macOS Keychain, Windows Credential Manager, or Linux Secret Service).
|
|
|
|
username : your Odoo login email (e.g. you@mpmedia.tv)
|
|
api_key : your personal Odoo API key — generate it in Odoo at:
|
|
My Profile → Account Security → API Keys → New API Key
|
|
Set the expiration to "No Limit" (indefinite).
|
|
Never share this key or use a colleague's key.
|
|
|
|
Credentials are stored in the OS keystore under the service 'odoo-mpm'
|
|
and are never written to any file on disk."""
|
|
_keychain_set("odoo_username", username.strip())
|
|
_keychain_set("odoo_api_key", api_key.strip())
|
|
global _uid, _models, _resolved_api_key
|
|
_uid = None
|
|
_models = None
|
|
_resolved_api_key = None
|
|
try:
|
|
_connect()
|
|
return (
|
|
f"Credentials saved and verified. "
|
|
f"Connected to {ODOO_URL} as UID {_uid}. "
|
|
f"You're all set — Odoo tools are ready to use."
|
|
)
|
|
except Exception as e:
|
|
return f"Credentials saved to Keychain but authentication failed: {e}"
|
|
|
|
@mcp.tool()
|
|
def clear_odoo_credentials() -> str:
|
|
"""Remove your stored Odoo credentials from the system keystore.
|
|
Use this if you are offboarding, rotating your API key, or troubleshooting
|
|
an authentication problem. You will need to run setup_odoo_credentials
|
|
again before using any Odoo tools."""
|
|
removed = []
|
|
for key in ("odoo_username", "odoo_api_key"):
|
|
if _keychain_delete(key):
|
|
removed.append(key)
|
|
global _uid, _models, _resolved_api_key
|
|
_uid = None
|
|
_models = None
|
|
_resolved_api_key = None
|
|
if removed:
|
|
return f"Removed {', '.join(removed)} from system keystore. Run setup_odoo_credentials to reconfigure."
|
|
return "No stored credentials found in system keystore."
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# PRODUCTS
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_products(query: str = "", limit: int = 20, product_type: str = "") -> list:
|
|
"""Search products/product templates by name or internal reference.
|
|
product_type can be: 'consu' (consumable), 'service', 'product' (storable).
|
|
Returns id, name, default_code, type, list_price, categ_id."""
|
|
domain = []
|
|
if query:
|
|
domain.append(["name", "ilike", query])
|
|
if product_type:
|
|
domain.append(["type", "=", product_type])
|
|
return _search_read("product.template", domain,
|
|
["id", "name", "default_code", "type", "list_price", "categ_id", "active"],
|
|
limit=limit, order="name asc")
|
|
|
|
@mcp.tool()
|
|
def get_product(product_id: int) -> dict:
|
|
"""Get full details of a product template by ID, including description,
|
|
variants, pricing, category, supplier info, and stock info."""
|
|
r = _read("product.template", [product_id],
|
|
["id", "name", "default_code", "type", "list_price", "standard_price",
|
|
"categ_id", "description", "description_sale", "description_purchase",
|
|
"uom_id", "uom_po_id", "active", "sale_ok", "purchase_ok",
|
|
"product_variant_count", "barcode"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def get_product_stock(product_id: int) -> list:
|
|
"""Get current stock quantities for a product (by product.template ID)
|
|
across all internal locations."""
|
|
variants = _search_read("product.product",
|
|
[["product_tmpl_id", "=", product_id]],
|
|
["id", "display_name"])
|
|
if not variants:
|
|
return []
|
|
variant_ids = [v["id"] for v in variants]
|
|
return _search_read("stock.quant",
|
|
[["product_id", "in", variant_ids],
|
|
["location_id.usage", "=", "internal"]],
|
|
["product_id", "location_id", "quantity", "reserved_quantity"],
|
|
limit=50)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# KNOWLEDGE
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_knowledge_articles(query: str = "", limit: int = 20) -> list:
|
|
"""Search Knowledge base articles by title. Returns id, name, parent article."""
|
|
domain = [["name", "ilike", query]] if query else []
|
|
return _search_read("knowledge.article", domain,
|
|
["id", "name", "parent_id", "is_published", "write_date"],
|
|
limit=limit, order="write_date desc")
|
|
|
|
@mcp.tool()
|
|
def get_knowledge_article(article_id: int) -> dict:
|
|
"""Get the full content of a Knowledge article by ID, including its icon emoji.
|
|
Note: Inline base64 images are replaced with [embedded image] placeholders
|
|
to keep the response size manageable."""
|
|
r = _read("knowledge.article", [article_id],
|
|
["id", "name", "icon", "body", "parent_id", "child_ids",
|
|
"is_published", "write_date", "write_uid"])
|
|
if not r:
|
|
return {}
|
|
article = r[0]
|
|
if article.get("body"):
|
|
article["body"] = re.sub(
|
|
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
|
'src="[embedded image]"',
|
|
article["body"]
|
|
)
|
|
return article
|
|
|
|
@mcp.tool()
|
|
def create_knowledge_article(name: str, body: str, parent_id: int = None,
|
|
icon: str = "") -> int:
|
|
"""Create a new Knowledge article. body is HTML. Returns new article ID.
|
|
|
|
icon: optional Unicode emoji to display as the article icon in the KB tree
|
|
(e.g. '⚙️', '🔌', '🖥️'). Leave empty to use Odoo's default.
|
|
Browse available emojis at: https://emojipedia.org"""
|
|
vals = {"name": name, "body": body}
|
|
if parent_id:
|
|
vals["parent_id"] = parent_id
|
|
if icon:
|
|
vals["icon"] = icon
|
|
return _create("knowledge.article", vals)
|
|
|
|
@mcp.tool()
|
|
def update_knowledge_article(article_id: int, name: str = "", body: str = "",
|
|
icon: str = "") -> bool:
|
|
"""Update a Knowledge article's title, body (HTML), and/or icon emoji.
|
|
|
|
icon: Unicode emoji to set as the article icon in the KB tree
|
|
(e.g. '⚙️', '🔌', '🖥️'). Empty string = no change (preserves existing icon).
|
|
Browse available emojis at: https://emojipedia.org"""
|
|
vals = {}
|
|
if name:
|
|
vals["name"] = name
|
|
if body:
|
|
vals["body"] = body
|
|
if icon:
|
|
vals["icon"] = icon
|
|
if not vals:
|
|
return False
|
|
return _write("knowledge.article", article_id, vals)
|
|
|
|
@mcp.tool()
|
|
def search_knowledge_templates(query: str = "", category: str = "", limit: int = 50) -> list:
|
|
"""Search Knowledge Base article templates.
|
|
Optionally filter by template name or category name (e.g. 'Productivity', 'Sales',
|
|
'Marketing', 'Company Organization', 'Product Management').
|
|
Returns id, template_name, template_description, category, and sequence."""
|
|
domain = [["is_template", "=", True]]
|
|
if query:
|
|
domain.append(["template_name", "ilike", query])
|
|
if category:
|
|
domain.append(["template_category_id.name", "ilike", category])
|
|
return _search_read("knowledge.article", domain,
|
|
["id", "name", "template_name", "template_description",
|
|
"template_category_id", "template_sequence", "is_published"],
|
|
limit=limit, order="template_category_sequence asc, template_sequence asc")
|
|
|
|
@mcp.tool()
|
|
def get_knowledge_template(template_id: int) -> dict:
|
|
"""Get full details of a Knowledge Base article template by ID, including
|
|
the template body content. Inline base64 images are replaced with
|
|
[embedded image] placeholders."""
|
|
r = _read("knowledge.article", [template_id],
|
|
["id", "name", "template_name", "template_description",
|
|
"template_body", "template_preview", "template_category_id",
|
|
"template_sequence", "is_published"])
|
|
if not r:
|
|
return {}
|
|
template = r[0]
|
|
if template.get("template_body"):
|
|
template["template_body"] = re.sub(
|
|
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
|
'src="[embedded image]"',
|
|
template["template_body"]
|
|
)
|
|
if template.get("template_preview"):
|
|
template["template_preview"] = re.sub(
|
|
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
|
'src="[embedded image]"',
|
|
template["template_preview"]
|
|
)
|
|
return template
|
|
|
|
@mcp.tool()
|
|
def list_knowledge_template_categories() -> list:
|
|
"""List all Knowledge Base article template categories with their IDs and names."""
|
|
return _search_read("knowledge.article.template.category", [],
|
|
["id", "name", "sequence"],
|
|
limit=50, order="sequence asc")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# CONTACTS
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_contacts(query: str = "", is_company: bool = None, limit: int = 20) -> list:
|
|
"""Search contacts/partners by name, email, or phone.
|
|
Set is_company=True for companies only, False for individuals."""
|
|
domain = []
|
|
if query:
|
|
domain.append("|")
|
|
domain.append(["name", "ilike", query])
|
|
domain.append("|")
|
|
domain.append(["email", "ilike", query])
|
|
domain.append(["phone", "ilike", query])
|
|
if is_company is not None:
|
|
domain.append(["is_company", "=", is_company])
|
|
return _search_read("res.partner", domain,
|
|
["id", "name", "email", "phone", "is_company",
|
|
"street", "city", "state_id", "country_id", "website"],
|
|
limit=limit, order="name asc")
|
|
|
|
@mcp.tool()
|
|
def get_contact(contact_id: int) -> dict:
|
|
"""Get full details of a contact/partner by ID."""
|
|
r = _read("res.partner", [contact_id],
|
|
["id", "name", "email", "phone", "mobile", "is_company", "parent_id",
|
|
"street", "street2", "city", "state_id", "zip", "country_id",
|
|
"website", "comment", "category_id", "child_ids", "user_id"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def create_contact(name: str, email: str = "", phone: str = "",
|
|
is_company: bool = False, parent_id: int = None,
|
|
street: str = "", city: str = "") -> int:
|
|
"""Create a new contact. Returns the new contact ID."""
|
|
vals = {"name": name, "is_company": is_company}
|
|
if email: vals["email"] = email
|
|
if phone: vals["phone"] = phone
|
|
if parent_id: vals["parent_id"] = parent_id
|
|
if street: vals["street"] = street
|
|
if city: vals["city"] = city
|
|
return _create("res.partner", vals)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# SALES
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_sales_orders(query: str = "", state: str = "", limit: int = 20) -> list:
|
|
"""Search sales orders by name or customer. State options: draft, sent, sale, done, cancel."""
|
|
domain = []
|
|
if query:
|
|
domain.append("|")
|
|
domain.append(["name", "ilike", query])
|
|
domain.append(["partner_id.name", "ilike", query])
|
|
if state:
|
|
domain.append(["state", "=", state])
|
|
return _search_read("sale.order", domain,
|
|
["id", "name", "partner_id", "state", "amount_total",
|
|
"date_order", "user_id", "validity_date"],
|
|
limit=limit, order="date_order desc")
|
|
|
|
@mcp.tool()
|
|
def get_sales_order(order_id: int) -> dict:
|
|
"""Get full details of a sales order including line items."""
|
|
r = _read("sale.order", [order_id],
|
|
["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax",
|
|
"amount_total", "date_order", "user_id", "note", "order_line",
|
|
"validity_date", "payment_term_id", "commitment_date"])
|
|
if not r:
|
|
return {}
|
|
order = r[0]
|
|
if order.get("order_line"):
|
|
lines = _read("sale.order.line", order["order_line"],
|
|
["id", "product_id", "name", "product_uom_qty",
|
|
"price_unit", "price_subtotal", "qty_delivered", "qty_invoiced"])
|
|
order["lines"] = lines
|
|
return order
|
|
|
|
@mcp.tool()
|
|
def create_sales_order(partner_id: int, lines: list) -> int:
|
|
"""Create a sales order. lines is a list of dicts with keys:
|
|
product_id (int), product_uom_qty (float), price_unit (float).
|
|
Returns new order ID."""
|
|
order_lines = []
|
|
for l in lines:
|
|
order_lines.append((0, 0, {
|
|
"product_id": l["product_id"],
|
|
"product_uom_qty": l.get("product_uom_qty", 1),
|
|
"price_unit": l.get("price_unit", 0),
|
|
}))
|
|
return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines})
|
|
|
|
@mcp.tool()
|
|
def cancel_sale_orders(order_ids: list) -> dict:
|
|
"""Cancel one or more quotations. ONLY operates on orders in Quotation (draft) or
|
|
Quotation Sent (sent) state — will raise an error if any provided IDs are in any
|
|
other state (Sale Order, Locked, Cancelled, etc.). This tool intentionally cannot
|
|
cancel confirmed sale orders.
|
|
Trigger phrases: "cancel quotation", "cancel quote"."""
|
|
if not order_ids:
|
|
return {"success": True, "cancelled_ids": [], "count": 0}
|
|
# Pre-validate: only draft/sent allowed — refuse everything else
|
|
bad_orders = _search_read(
|
|
"sale.order",
|
|
[["id", "in", order_ids], ["state", "not in", ["draft", "sent"]]],
|
|
["id", "name", "state"]
|
|
)
|
|
if bad_orders:
|
|
bad_desc = [f"{o['name']} (id={o['id']}, state={o['state']})" for o in bad_orders]
|
|
raise Exception(
|
|
f"[cancel_sale_orders] Refused — only Quotation (draft) or Quotation Sent (sent) "
|
|
f"orders may be cancelled via this tool. The following IDs are in other states: "
|
|
f"{', '.join(bad_desc)}"
|
|
)
|
|
try:
|
|
_call("sale.order", "action_cancel", [order_ids])
|
|
return {"success": True, "cancelled_ids": order_ids, "count": len(order_ids)}
|
|
except Exception as e:
|
|
raise Exception(f"[cancel_sale_orders] Failed: {e}")
|
|
|
|
@mcp.tool()
|
|
def cancel_and_archive_quotations(order_ids: list) -> dict:
|
|
"""Cancel then immediately archive sale order quotations in one call.
|
|
All IDs must be in draft or sent state. Cancel is validated before archive is attempted —
|
|
if cancel fails, archive is not executed.
|
|
Trigger phrases: "cancel and archive", "remove quotation", "clean up quotes"."""
|
|
if not order_ids:
|
|
return {"success": True, "order_ids": [], "count": 0, "steps_completed": []}
|
|
# Pre-validate: check for any orders NOT in draft/sent
|
|
bad_orders = _search_read(
|
|
"sale.order",
|
|
[["id", "in", order_ids], ["state", "not in", ["draft", "sent"]]],
|
|
["id", "name", "state"]
|
|
)
|
|
if bad_orders:
|
|
bad_desc = [f"{o['name']} (id={o['id']}, state={o['state']})" for o in bad_orders]
|
|
raise Exception(
|
|
f"[cancel_and_archive_quotations] Cannot proceed — orders not in draft/sent state: "
|
|
f"{', '.join(bad_desc)}"
|
|
)
|
|
try:
|
|
_call("sale.order", "action_cancel", [order_ids])
|
|
except Exception as e:
|
|
raise Exception(f"[cancel_and_archive_quotations] Cancel step failed: {e}")
|
|
try:
|
|
_write("sale.order", order_ids, {"active": False})
|
|
except Exception as e:
|
|
raise Exception(
|
|
f"[cancel_and_archive_quotations] Archive step failed (orders were cancelled): {e}"
|
|
)
|
|
return {
|
|
"success": True,
|
|
"order_ids": order_ids,
|
|
"count": len(order_ids),
|
|
"steps_completed": ["cancelled", "archived"]
|
|
}
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# CRM
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_crm_leads(query: str = "", stage: str = "", assigned_to_me: bool = False,
|
|
limit: int = 20) -> list:
|
|
"""Search CRM leads/opportunities. Filter by name, stage name, or assigned to current user."""
|
|
domain = [["type", "=", "opportunity"]]
|
|
if query:
|
|
domain.append("|")
|
|
domain.append(["name", "ilike", query])
|
|
domain.append(["partner_id.name", "ilike", query])
|
|
if stage:
|
|
domain.append(["stage_id.name", "ilike", stage])
|
|
return _search_read("crm.lead", domain,
|
|
["id", "name", "partner_id", "stage_id", "expected_revenue",
|
|
"probability", "user_id", "date_deadline", "priority"],
|
|
limit=limit, order="date_deadline asc")
|
|
|
|
@mcp.tool()
|
|
def get_crm_lead(lead_id: int) -> dict:
|
|
"""Get full details of a CRM lead/opportunity by ID."""
|
|
r = _read("crm.lead", [lead_id],
|
|
["id", "name", "type", "partner_id", "stage_id", "user_id",
|
|
"expected_revenue", "probability", "date_deadline", "priority",
|
|
"description", "phone", "email_from", "tag_ids",
|
|
"activity_ids", "date_conversion"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def create_crm_lead(name: str, partner_id: int = None, expected_revenue: float = 0,
|
|
description: str = "", email: str = "", phone: str = "") -> int:
|
|
"""Create a new CRM opportunity. Returns new lead ID."""
|
|
vals = {"name": name, "type": "opportunity", "expected_revenue": expected_revenue}
|
|
if partner_id: vals["partner_id"] = partner_id
|
|
if description: vals["description"] = description
|
|
if email: vals["email_from"] = email
|
|
if phone: vals["phone"] = phone
|
|
return _create("crm.lead", vals)
|
|
|
|
@mcp.tool()
|
|
def update_crm_lead(lead_id: int, stage_id: int = None, probability: float = None,
|
|
expected_revenue: float = None, note: str = "") -> bool:
|
|
"""Update a CRM lead's stage, probability, revenue, or notes."""
|
|
vals = {}
|
|
if stage_id is not None: vals["stage_id"] = stage_id
|
|
if probability is not None: vals["probability"] = probability
|
|
if expected_revenue is not None: vals["expected_revenue"] = expected_revenue
|
|
if note: vals["description"] = note
|
|
return _write("crm.lead", lead_id, vals) if vals else False
|
|
|
|
@mcp.tool()
|
|
def list_crm_stages() -> list:
|
|
"""List all CRM pipeline stages with their IDs and names."""
|
|
return _search_read("crm.stage", [], ["id", "name", "sequence"],
|
|
limit=50, order="sequence asc")
|
|
|
|
@mcp.tool()
|
|
def mark_crm_lead_lost(lead_id: int, lost_reason_id: int = None) -> dict:
|
|
"""Mark a CRM lead/opportunity as Lost. Sets probability=0, deactivates the record,
|
|
and records the loss reason. Use list_crm_lost_reasons() to get valid reason IDs.
|
|
Trigger phrases: "mark lost", "set as lost", "lose the opportunity"."""
|
|
vals: dict = {"active": False, "probability": 0}
|
|
if lost_reason_id is not None:
|
|
vals["lost_reason_id"] = lost_reason_id
|
|
try:
|
|
_write("crm.lead", [lead_id], vals)
|
|
return {
|
|
"success": True,
|
|
"lead_id": lead_id,
|
|
"lost_reason_id": lost_reason_id,
|
|
"action": "marked_lost"
|
|
}
|
|
except Exception as e:
|
|
raise Exception(f"[mark_crm_lead_lost] Failed for lead {lead_id}: {e}")
|
|
|
|
@mcp.tool()
|
|
def list_crm_lost_reasons() -> list:
|
|
"""List all available CRM lost reasons with IDs and names.
|
|
Call this before mark_crm_lead_lost so the user can pick a reason by name.
|
|
Trigger phrases: "lost reasons", "why lost", "reason for losing"."""
|
|
try:
|
|
return _search_read("crm.lost.reason", [], ["id", "name"], limit=100, order="name asc")
|
|
except Exception as e:
|
|
raise Exception(f"[list_crm_lost_reasons] Failed: {e}")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# PROJECT
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def list_projects(limit: int = 50) -> list:
|
|
"""List all active projects with id, name, description, and task count."""
|
|
return _search_read("project.project", [["active", "=", True]],
|
|
["id", "name", "description", "task_count", "user_id", "date_start", "date"],
|
|
limit=limit, order="name asc")
|
|
|
|
@mcp.tool()
|
|
def get_project(project_id: int) -> dict:
|
|
"""Get full details of a project by ID."""
|
|
r = _read("project.project", [project_id],
|
|
["id", "name", "description", "user_id", "task_count",
|
|
"date_start", "date", "tag_ids", "privacy_visibility"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def search_tasks(query: str = "", project_id: int = None, stage: str = "",
|
|
limit: int = 30) -> list:
|
|
"""Search project tasks by title, project, or stage name."""
|
|
domain = []
|
|
if query:
|
|
domain.append(["name", "ilike", query])
|
|
if project_id:
|
|
domain.append(["project_id", "=", project_id])
|
|
if stage:
|
|
domain.append(["stage_id.name", "ilike", stage])
|
|
return _search_read("project.task", domain,
|
|
["id", "name", "project_id", "stage_id", "user_ids",
|
|
"date_deadline", "priority", "kanban_state", "tag_ids"],
|
|
limit=limit, order="date_deadline asc")
|
|
|
|
@mcp.tool()
|
|
def get_task(task_id: int) -> dict:
|
|
"""Get full details of a project task by ID including description and subtasks."""
|
|
r = _read("project.task", [task_id],
|
|
["id", "name", "project_id", "stage_id", "user_ids", "description",
|
|
"date_deadline", "priority", "kanban_state", "tag_ids",
|
|
"child_ids", "depend_on_ids", "planned_hours", "effective_hours"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def create_task(name: str, project_id: int, description: str = "",
|
|
date_deadline: str = "", user_ids: list = None) -> int:
|
|
"""Create a project task. date_deadline format: YYYY-MM-DD. Returns task ID."""
|
|
vals = {"name": name, "project_id": project_id}
|
|
if description: vals["description"] = description
|
|
if date_deadline: vals["date_deadline"] = date_deadline
|
|
if user_ids: vals["user_ids"] = [(6, 0, user_ids)]
|
|
return _create("project.task", vals)
|
|
|
|
@mcp.tool()
|
|
def update_task(task_id: int, name: str = "", stage_id: int = None,
|
|
description: str = "", date_deadline: str = "",
|
|
kanban_state: str = "") -> bool:
|
|
"""Update a task. kanban_state: normal, done, blocked."""
|
|
vals = {}
|
|
if name: vals["name"] = name
|
|
if stage_id: vals["stage_id"] = stage_id
|
|
if description: vals["description"] = description
|
|
if date_deadline: vals["date_deadline"] = date_deadline
|
|
if kanban_state: vals["kanban_state"] = kanban_state
|
|
return _write("project.task", task_id, vals) if vals else False
|
|
|
|
@mcp.tool()
|
|
def list_task_stages(project_id: int = None) -> list:
|
|
"""List task stages. Optionally filter by project."""
|
|
domain = []
|
|
if project_id:
|
|
domain.append(["project_ids", "in", [project_id]])
|
|
return _search_read("project.task.type", domain,
|
|
["id", "name", "sequence"], limit=50, order="sequence asc")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# HELPDESK
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_helpdesk_tickets(query: str = "", stage: str = "", team: str = "",
|
|
created_after: str = "", created_before: str = "",
|
|
limit: int = 20) -> list:
|
|
"""Search helpdesk tickets by name, stage, or team.
|
|
created_after and created_before accept datetime strings in 'YYYY-MM-DD' or
|
|
'YYYY-MM-DD HH:MM:SS' format to filter by creation date."""
|
|
domain = []
|
|
if query:
|
|
domain.append(["name", "ilike", query])
|
|
if stage:
|
|
domain.append(["stage_id.name", "ilike", stage])
|
|
if team:
|
|
domain.append(["team_id.name", "ilike", team])
|
|
if created_after:
|
|
domain.append(["create_date", ">=", created_after])
|
|
if created_before:
|
|
domain.append(["create_date", "<=", created_before])
|
|
return _search_read("helpdesk.ticket", domain,
|
|
["id", "name", "partner_id", "stage_id", "team_id",
|
|
"user_id", "priority", "create_date"],
|
|
limit=limit, order="create_date desc")
|
|
|
|
@mcp.tool()
|
|
def get_helpdesk_ticket(ticket_id: int) -> dict:
|
|
"""Get full details of a helpdesk ticket by ID."""
|
|
r = _read("helpdesk.ticket", [ticket_id],
|
|
["id", "name", "partner_id", "stage_id", "team_id", "user_id",
|
|
"priority", "description", "create_date",
|
|
"date_last_stage_update", "kanban_state", "tag_ids"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def create_helpdesk_ticket(name: str, description: str = "",
|
|
partner_id: int = None, team_id: int = None) -> int:
|
|
"""Create a helpdesk ticket. Returns new ticket ID."""
|
|
vals = {"name": name}
|
|
if description: vals["description"] = description
|
|
if partner_id: vals["partner_id"] = partner_id
|
|
if team_id: vals["team_id"] = team_id
|
|
return _create("helpdesk.ticket", vals)
|
|
|
|
@mcp.tool()
|
|
def update_helpdesk_ticket(ticket_id: int, stage_id: int = None,
|
|
user_id: int = None, note: str = "") -> bool:
|
|
"""Update a helpdesk ticket's stage, assignee, or add a note."""
|
|
vals = {}
|
|
if stage_id: vals["stage_id"] = stage_id
|
|
if user_id: vals["user_id"] = user_id
|
|
if note: vals["description"] = note
|
|
return _write("helpdesk.ticket", ticket_id, vals) if vals else False
|
|
|
|
@mcp.tool()
|
|
def list_helpdesk_teams() -> list:
|
|
"""List all helpdesk teams."""
|
|
return _search_read("helpdesk.team", [], ["id", "name", "description"], limit=20)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# PURCHASE
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_purchase_orders(query: str = "", state: str = "", limit: int = 20) -> list:
|
|
"""Search purchase orders by name or vendor.
|
|
State: draft, sent, to approve, purchase, done, cancel."""
|
|
domain = []
|
|
if query:
|
|
domain.append("|")
|
|
domain.append(["name", "ilike", query])
|
|
domain.append(["partner_id.name", "ilike", query])
|
|
if state:
|
|
domain.append(["state", "=", state])
|
|
return _search_read("purchase.order", domain,
|
|
["id", "name", "partner_id", "state", "amount_total",
|
|
"date_order", "user_id", "date_planned"],
|
|
limit=limit, order="date_order desc")
|
|
|
|
@mcp.tool()
|
|
def get_purchase_order(order_id: int) -> dict:
|
|
"""Get full details of a purchase order including line items."""
|
|
r = _read("purchase.order", [order_id],
|
|
["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax",
|
|
"amount_total", "date_order", "user_id", "notes",
|
|
"order_line", "date_planned", "payment_term_id"])
|
|
if not r:
|
|
return {}
|
|
order = r[0]
|
|
if order.get("order_line"):
|
|
lines = _read("purchase.order.line", order["order_line"],
|
|
["id", "product_id", "name", "product_qty", "price_unit",
|
|
"price_subtotal", "qty_received", "qty_invoiced", "date_planned"])
|
|
order["lines"] = lines
|
|
return order
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# INVENTORY
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_inventory(query: str = "", location: str = "internal",
|
|
limit: int = 30) -> list:
|
|
"""Search current stock levels by product name.
|
|
location: 'internal' (default), 'customer', 'supplier', 'transit'."""
|
|
domain = [["location_id.usage", "=", location], ["quantity", ">", 0]]
|
|
if query:
|
|
domain.append(["product_id.name", "ilike", query])
|
|
return _search_read("stock.quant", domain,
|
|
["product_id", "location_id", "quantity", "reserved_quantity",
|
|
"lot_id", "package_id"],
|
|
limit=limit, order="product_id asc")
|
|
|
|
@mcp.tool()
|
|
def get_stock_moves(product_id: int, limit: int = 20) -> list:
|
|
"""Get recent stock moves for a product (product.product ID)."""
|
|
return _search_read("stock.move", [
|
|
["product_id", "=", product_id],
|
|
["state", "=", "done"]
|
|
], ["id", "name", "product_id", "product_uom_qty", "location_id",
|
|
"location_dest_id", "date", "picking_id", "origin"],
|
|
limit=limit, order="date desc")
|
|
|
|
@mcp.tool()
|
|
def list_internal_locations() -> list:
|
|
"""List all internal warehouse/stock locations."""
|
|
return _search_read("stock.location",
|
|
[["usage", "=", "internal"], ["active", "=", True]],
|
|
["id", "name", "complete_name", "location_id"],
|
|
limit=100, order="complete_name asc")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# EMPLOYEES
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def search_employees(query: str = "", department: str = "",
|
|
job_title: str = "", limit: int = 30) -> list:
|
|
"""Search employees by name, department, or job title."""
|
|
domain = [["active", "=", True]]
|
|
if query:
|
|
domain.append(["name", "ilike", query])
|
|
if department:
|
|
domain.append(["department_id.name", "ilike", department])
|
|
if job_title:
|
|
domain.append(["job_title", "ilike", job_title])
|
|
return _search_read("hr.employee", domain,
|
|
["id", "name", "job_title", "department_id", "work_email",
|
|
"work_phone", "parent_id", "coach_id"],
|
|
limit=limit, order="name asc")
|
|
|
|
@mcp.tool()
|
|
def get_employee(employee_id: int) -> dict:
|
|
"""Get full details of an employee by ID."""
|
|
r = _read("hr.employee", [employee_id],
|
|
["id", "name", "job_title", "job_id", "department_id", "work_email",
|
|
"work_phone", "mobile_phone", "parent_id", "coach_id",
|
|
"address_id", "resource_calendar_id", "tz",
|
|
"birthday", "marital", "country_id"])
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def list_departments() -> list:
|
|
"""List all HR departments."""
|
|
return _search_read("hr.department", [],
|
|
["id", "name", "parent_id", "manager_id"], limit=50, order="name asc")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# UTILITY — GENERIC
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@mcp.tool()
|
|
def odoo_search(model: str, field: str, query: str, limit: int = 10) -> list:
|
|
"""Generic search on any Odoo model. Searches field 'field' using ilike.
|
|
Useful for looking up stage IDs, category IDs, etc.
|
|
Example: odoo_search('project.task.type', 'name', 'In Progress')"""
|
|
return _search_read(model, [[field, "ilike", query]], limit=limit)
|
|
|
|
@mcp.tool()
|
|
def odoo_get_record(model: str, record_id: int) -> dict:
|
|
"""Fetch a single record from any Odoo model by ID. Returns all default fields."""
|
|
r = _call(model, "read", [[record_id]], {})
|
|
return r[0] if r else {}
|
|
|
|
@mcp.tool()
|
|
def odoo_search_read(model: str, domain: list, fields: list,
|
|
limit: int = 100, offset: int = 0, order: str = "") -> list:
|
|
"""Search any Odoo model with a full domain filter and return only specified fields.
|
|
Primary tool for efficient data retrieval — avoids token bloat from full-record responses.
|
|
Trigger phrases: "search with filters", "get records where", "filter by",
|
|
"find all X where Y", "search_read".
|
|
|
|
Domain syntax (Odoo prefix notation):
|
|
Comparison: ['field', '=', val] ['field', '!=', val] ['field', '>', val]
|
|
['field', '>=', val] ['field', '<', val] ['field', 'in', [v1,v2]]
|
|
['field', 'not in', [v1]] ['field', 'ilike', 'text']
|
|
['field', '!=', False] (field is set)
|
|
['field', '=', False] (field is empty)
|
|
Logical: '|' AND '!' (prefix operators)
|
|
AND example: [['state', 'in', ['draft','sent']], ['amount_total', '>', 0]]
|
|
OR example: ['|', ['user_id', '=', 57], ['user_id', '=', 75]]
|
|
3-way OR: ['|', ['a','=',1], '|', ['b','=',2], ['c','=',3]]
|
|
|
|
Never pass fields=[] — that returns all fields and defeats the purpose."""
|
|
if not fields:
|
|
raise Exception("[odoo_search_read] fields cannot be empty — specify the fields you need.")
|
|
kw: dict = {"fields": fields, "limit": limit, "offset": offset}
|
|
if order:
|
|
kw["order"] = order
|
|
try:
|
|
return _call(model, "search_read", [domain], kw)
|
|
except Exception as e:
|
|
raise Exception(f"[odoo_search_read] Failed on model '{model}': {e}")
|
|
|
|
@mcp.tool()
|
|
def get_record_count(model: str, domain: list) -> dict:
|
|
"""Count records matching a domain without fetching data.
|
|
Use to check scope before bulk operations or build quick reports.
|
|
Trigger phrases: "how many", "count of", "number of records"."""
|
|
try:
|
|
count = _call(model, "search_count", [domain])
|
|
# Build a human-readable summary from the domain
|
|
parts = []
|
|
for clause in domain:
|
|
if isinstance(clause, (list, tuple)) and len(clause) == 3:
|
|
parts.append(f"{clause[0]} {clause[1]} {clause[2]}")
|
|
summary = ", ".join(parts) if parts else "all records"
|
|
return {"model": model, "domain_summary": summary, "count": count}
|
|
except Exception as e:
|
|
raise Exception(f"[get_record_count] Failed on model '{model}': {e}")
|
|
|
|
@mcp.tool()
|
|
def bulk_update_records(model: str, record_ids: list, values: dict) -> dict:
|
|
"""Update one or more fields on a list of records across any Odoo model in one API call.
|
|
All records in record_ids receive the same field values.
|
|
Covers stage changes, custom Studio fields (x_studio_*), status flags, etc.
|
|
Trigger phrases: "update field", "set value on", "bulk update", "change stage"."""
|
|
if not record_ids:
|
|
return {"success": True, "model": model, "updated_ids": [], "count": 0, "values_set": values}
|
|
try:
|
|
_write(model, record_ids, values)
|
|
return {
|
|
"success": True,
|
|
"model": model,
|
|
"updated_ids": record_ids,
|
|
"count": len(record_ids),
|
|
"values_set": values
|
|
}
|
|
except Exception as e:
|
|
raise Exception(f"[bulk_update_records] Failed on model '{model}': {e}")
|
|
|
|
@mcp.tool()
|
|
def archive_records(model: str, record_ids: list) -> dict:
|
|
"""Archive (soft-delete) records by setting active=False. Records are hidden from
|
|
standard views but not deleted. Works on any model with an active field.
|
|
For sale.order: cancel first with cancel_sale_orders. For crm.lead: archive directly.
|
|
Trigger phrases: "archive", "hide record", "soft delete", "deactivate"."""
|
|
if not record_ids:
|
|
return {"success": True, "archived_model": model, "archived_ids": [], "count": 0}
|
|
try:
|
|
_write(model, record_ids, {"active": False})
|
|
return {
|
|
"success": True,
|
|
"archived_model": model,
|
|
"archived_ids": record_ids,
|
|
"count": len(record_ids)
|
|
}
|
|
except Exception as e:
|
|
raise Exception(f"[archive_records] Failed on model '{model}': {e}")
|
|
|
|
@mcp.tool()
|
|
def get_stage_ids(model: str, stage_names: list) -> dict:
|
|
"""Resolve stage names to IDs for CRM, Project Task, or Helpdesk stages.
|
|
Matching is case-insensitive and exact (not partial).
|
|
|
|
model values:
|
|
'crm.stage' — CRM opportunity stages
|
|
'project.task.type' — Project task stages
|
|
'helpdesk.stage' — Helpdesk ticket stages
|
|
|
|
Returns resolved dict (name → id) and unresolved list for any names not found.
|
|
Trigger phrases: "stage id for", "resolve stage", "what is the id of stage"."""
|
|
if not stage_names:
|
|
return {"model": model, "resolved": {}, "unresolved": []}
|
|
# Build case-insensitive OR domain (interleaved prefix notation)
|
|
# ['|', cond1, cond2] for 2, ['|', cond1, '|', cond2, cond3] for 3, etc.
|
|
conditions = [["name", "ilike", name] for name in stage_names]
|
|
if len(conditions) == 1:
|
|
domain = conditions
|
|
else:
|
|
domain = []
|
|
for i, cond in enumerate(conditions):
|
|
if i < len(conditions) - 1:
|
|
domain.append("|")
|
|
domain.append(cond)
|
|
try:
|
|
results = _search_read(model, domain, ["id", "name"], limit=200)
|
|
except Exception as e:
|
|
raise Exception(f"[get_stage_ids] Failed on model '{model}': {e}")
|
|
# Post-filter: exact case-insensitive match (ilike is contains, we want exact)
|
|
resolved = {}
|
|
for req_name in stage_names:
|
|
for r in results:
|
|
if r["name"].strip().lower() == req_name.strip().lower():
|
|
resolved[req_name] = r["id"]
|
|
break
|
|
unresolved = [n for n in stage_names if n not in resolved]
|
|
return {"model": model, "resolved": resolved, "unresolved": unresolved}
|
|
|
|
@mcp.tool()
|
|
def call_odoo_method(model: str, method: str, record_ids: list,
|
|
kwargs: dict = None) -> object:
|
|
"""Direct pass-through to any Odoo model method. Escape hatch for operations
|
|
not covered by other tools — workflow transitions, report actions, custom methods.
|
|
Returns raw Odoo response. Use sparingly and document usage in session notes.
|
|
Trigger phrases: "call method", "run action", "execute button"."""
|
|
try:
|
|
return _call(model, method, [record_ids], kwargs or {})
|
|
except Exception as e:
|
|
raise Exception(f"[call_odoo_method] Failed calling '{model}.{method}': {e}")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# ELEARNING — COURSES & CONTENT
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
#
|
|
# Models:
|
|
# slide.channel — Courses
|
|
# slide.slide — Slides / content items
|
|
# slide.question — Quiz questions (linked to slides)
|
|
# slide.answer — Quiz answer options
|
|
# slide.channel.partner — Enrollment & completion tracking
|
|
# slide.channel.resource — Course resource attachments
|
|
#
|
|
# Google Drive / image operations:
|
|
# Always use the Google Workspace MCP connector when it is available in the
|
|
# session. Only ask the user to provide a URL or upload content manually if
|
|
# the connector is NOT connected. Never construct Drive URLs manually.
|
|
#
|
|
# slide_type values:
|
|
# 'pdf' — PDF document (url = Drive share link or direct PDF URL)
|
|
# 'youtube_video' — YouTube video (url = YouTube watch URL)
|
|
# 'vimeo_video' — Vimeo video (url = Vimeo video URL)
|
|
# 'infographic' — Image / infographic
|
|
# 'webpage' — Article with native HTML content (html_content field)
|
|
#
|
|
# visibility values: 'public' | 'connected' | 'members' | 'website'
|
|
# enroll values: 'public' (Open) | 'invite' (On Invitation)
|
|
# channel_type values: 'training' | 'documentation'
|
|
|
|
|
|
# ── Courses ───────────────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
def search_courses(query: str = "", channel_type: str = "", visibility: str = "",
|
|
enroll: str = "", published: bool = None, limit: int = 20) -> list:
|
|
"""Search eLearning courses.
|
|
|
|
channel_type : 'training' or 'documentation'
|
|
visibility : 'public' (Everyone), 'connected' (Signed In),
|
|
'members' (Course Attendees), 'website' (Anyone with link)
|
|
enroll : 'public' (Open) or 'invite' (On Invitation)
|
|
published : True = live courses only, False = drafts only, None = all
|
|
|
|
Returns id, name, channel_type, visibility, enroll policy, publish status,
|
|
slide counts by type, member count, and website URL.
|
|
Trigger phrases: "list courses", "find course", "search elearning", "training courses",
|
|
"what courses do we have"."""
|
|
domain = []
|
|
if query:
|
|
domain.append(["name", "ilike", query])
|
|
if channel_type:
|
|
domain.append(["channel_type", "=", channel_type])
|
|
if visibility:
|
|
domain.append(["visibility", "=", visibility])
|
|
if enroll:
|
|
domain.append(["enroll", "=", enroll])
|
|
if published is not None:
|
|
domain.append(["website_published", "=", published])
|
|
return _search_read("slide.channel", domain,
|
|
["id", "name", "channel_type", "visibility", "enroll", "website_published",
|
|
"total_slides", "nbr_document", "nbr_video", "nbr_infographic",
|
|
"members_count", "website_url", "tag_ids"],
|
|
limit=limit, order="name asc")
|
|
|
|
|
|
@mcp.tool()
|
|
def get_course(channel_id: int) -> dict:
|
|
"""Get full details of an eLearning course: settings, description, slide list,
|
|
enrollment count, upload/auto-enroll groups, and cover image presence.
|
|
Trigger phrases: "get course", "course details", "show course"."""
|
|
r = _read("slide.channel", [channel_id],
|
|
["id", "name", "description", "channel_type", "visibility", "enroll",
|
|
"website_published", "total_slides", "nbr_document", "nbr_video",
|
|
"nbr_infographic", "members_count", "website_url", "tag_ids",
|
|
"slide_ids", "upload_group_ids", "enroll_group_ids"])
|
|
if not r:
|
|
return {}
|
|
course = r[0]
|
|
if course.get("description"):
|
|
course["description"] = re.sub(
|
|
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
|
'src="[embedded image]"',
|
|
course["description"]
|
|
)
|
|
return course
|
|
|
|
|
|
@mcp.tool()
|
|
def create_course(name: str, description: str = "",
|
|
channel_type: str = "training",
|
|
visibility: str = "public",
|
|
enroll: str = "public",
|
|
tag_ids: list = None,
|
|
cover_image_url: str = "",
|
|
enroll_group_names: list = None) -> dict:
|
|
"""Create a new eLearning course.
|
|
|
|
channel_type : 'training' (default) or 'documentation'
|
|
visibility : 'public', 'connected', 'members', 'website'
|
|
enroll : 'public' (Open) or 'invite' (On Invitation)
|
|
tag_ids : list of existing tag IDs to assign
|
|
cover_image_url : image URL — use Google Workspace MCP to get a Drive download
|
|
URL when available; only ask the user if the connector is absent
|
|
enroll_group_names: Odoo group names for auto-enrollment
|
|
e.g. ['Internal Users'] to restrict to employees
|
|
|
|
Returns dict with new course id and name.
|
|
Trigger phrases: "create course", "new course", "new training", "add elearning course"."""
|
|
vals = {
|
|
"name": name,
|
|
"channel_type": channel_type,
|
|
"visibility": visibility,
|
|
"enroll": enroll,
|
|
}
|
|
if description:
|
|
vals["description"] = description
|
|
if tag_ids:
|
|
vals["tag_ids"] = [(6, 0, tag_ids)]
|
|
if enroll_group_names:
|
|
gids = _resolve_group_ids(enroll_group_names)
|
|
if gids:
|
|
vals["enroll_group_ids"] = [(6, 0, gids)]
|
|
channel_id = _create("slide.channel", vals)
|
|
if cover_image_url:
|
|
try:
|
|
_write("slide.channel", channel_id, {"image_1920": _fetch_image_base64(cover_image_url)})
|
|
except Exception as e:
|
|
return {"id": channel_id, "name": name,
|
|
"warning": f"Course created but cover image failed: {e}"}
|
|
return {"id": channel_id, "name": name}
|
|
|
|
|
|
@mcp.tool()
|
|
def update_course(channel_id: int, name: str = "", description: str = "",
|
|
channel_type: str = "", visibility: str = "",
|
|
enroll: str = "", tag_ids: list = None,
|
|
cover_image_url: str = "",
|
|
enroll_group_names: list = None) -> dict:
|
|
"""Update an existing eLearning course. Only provided fields are changed.
|
|
|
|
channel_type : 'training' or 'documentation'
|
|
visibility : 'public', 'connected', 'members', 'website'
|
|
enroll : 'public' or 'invite'
|
|
tag_ids : replaces current tags with this list
|
|
cover_image_url : new cover image — use Google Workspace MCP connector when
|
|
available to get the download URL; do not ask the user to
|
|
construct or upload unless the connector is absent
|
|
enroll_group_names: replaces auto-enroll groups; pass [] to clear
|
|
|
|
Trigger phrases: "update course", "edit course", "change course settings"."""
|
|
vals = {}
|
|
if name: vals["name"] = name
|
|
if description: vals["description"] = description
|
|
if channel_type: vals["channel_type"] = channel_type
|
|
if visibility: vals["visibility"] = visibility
|
|
if enroll: vals["enroll"] = enroll
|
|
if tag_ids is not None:
|
|
vals["tag_ids"] = [(6, 0, tag_ids)]
|
|
if enroll_group_names is not None:
|
|
gids = _resolve_group_ids(enroll_group_names)
|
|
vals["enroll_group_ids"] = [(6, 0, gids)]
|
|
warnings = []
|
|
if vals:
|
|
_write("slide.channel", channel_id, vals)
|
|
if cover_image_url:
|
|
try:
|
|
_write("slide.channel", channel_id,
|
|
{"image_1920": _fetch_image_base64(cover_image_url)})
|
|
vals["image_1920"] = "(updated)"
|
|
except Exception as e:
|
|
warnings.append(f"Cover image update failed: {e}")
|
|
result = {"success": True, "channel_id": channel_id,
|
|
"fields_updated": list(vals.keys())}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
|
|
|
|
@mcp.tool()
|
|
def publish_course(channel_id: int, published: bool = True) -> dict:
|
|
"""Publish or unpublish an eLearning course.
|
|
|
|
published=True → course is live and visible on the website
|
|
published=False → course is hidden (draft)
|
|
|
|
Trigger phrases: "publish course", "unpublish course", "make course live",
|
|
"hide course", "take course offline"."""
|
|
_write("slide.channel", channel_id, {"website_published": published})
|
|
return {"success": True, "channel_id": channel_id,
|
|
"status": "published" if published else "unpublished"}
|
|
|
|
|
|
# ── Slides / Content ──────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
def list_course_slides(channel_id: int) -> list:
|
|
"""List all slides/content items in a course ordered by sequence.
|
|
Returns id, name, slide_type, published status, completion time, sequence, and URL.
|
|
Trigger phrases: "list slides", "course content", "course lessons", "show modules"."""
|
|
return _search_read("slide.slide",
|
|
[["channel_id", "=", channel_id]],
|
|
["id", "name", "slide_type", "is_published", "website_published",
|
|
"completion_time", "sequence", "url", "tag_ids"],
|
|
limit=500, order="sequence asc")
|
|
|
|
|
|
@mcp.tool()
|
|
def get_slide(slide_id: int) -> dict:
|
|
"""Get full details of a slide including HTML body (articles), URL (pdf/video),
|
|
and quiz questions if any are attached.
|
|
Trigger phrases: "get slide", "slide details", "show lesson", "get article content"."""
|
|
r = _read("slide.slide", [slide_id],
|
|
["id", "name", "channel_id", "slide_type", "is_published", "website_published",
|
|
"description", "html_content", "url", "completion_time", "sequence",
|
|
"tag_ids", "question_ids", "likes", "dislikes"])
|
|
if not r:
|
|
return {}
|
|
slide = r[0]
|
|
if slide.get("question_ids"):
|
|
try:
|
|
questions = _read("slide.question", slide["question_ids"],
|
|
["id", "question", "answer_ids", "sequence"])
|
|
for q in questions:
|
|
if q.get("answer_ids"):
|
|
q["answers"] = _read("slide.answer", q["answer_ids"],
|
|
["id", "text_value", "is_correct", "comment"])
|
|
slide["questions"] = sorted(questions, key=lambda q: q.get("sequence", 0))
|
|
except Exception:
|
|
slide["questions"] = []
|
|
return slide
|
|
|
|
|
|
@mcp.tool()
|
|
def create_slide(channel_id: int, name: str,
|
|
slide_type: str = "pdf",
|
|
description: str = "",
|
|
url: str = "",
|
|
html_content: str = "",
|
|
completion_time: float = 0.0,
|
|
sequence: int = 0,
|
|
is_published: bool = False) -> dict:
|
|
"""Create a new slide/content item in a course.
|
|
|
|
slide_type options:
|
|
'pdf' — PDF document. Set url to a Google Drive share link.
|
|
Use Google Workspace MCP to get the link — do not ask
|
|
the user to construct it unless the connector is absent.
|
|
'youtube_video' — YouTube video. Set url to the watch URL.
|
|
'vimeo_video' — Vimeo video. Set url to the Vimeo URL.
|
|
'infographic' — Image. Set url to a publicly accessible image URL.
|
|
'webpage' — Article. Set html_content to formatted HTML.
|
|
Claude can generate article HTML content directly.
|
|
|
|
url : Link to content. For Drive PDFs, use Google Workspace MCP
|
|
to get the shareable link — never ask the user to build it.
|
|
html_content : For 'webpage' articles only. Full HTML body.
|
|
completion_time : Estimated time in hours (0.25 = 15 min, 0.5 = 30 min).
|
|
sequence : Position in course (lower = earlier). 0 = append at end.
|
|
is_published : Immediately visible to learners if True.
|
|
|
|
Trigger phrases: "add slide", "add lesson", "add content", "create slide",
|
|
"add video", "add article", "add PDF", "add quiz"."""
|
|
vals = {
|
|
"channel_id": channel_id,
|
|
"name": name,
|
|
"slide_type": slide_type,
|
|
"is_published": is_published,
|
|
"website_published": is_published,
|
|
}
|
|
if description: vals["description"] = description
|
|
if url: vals["url"] = url
|
|
if html_content: vals["html_content"] = html_content
|
|
if completion_time: vals["completion_time"] = completion_time
|
|
if sequence: vals["sequence"] = sequence
|
|
slide_id = _create("slide.slide", vals)
|
|
return {"id": slide_id, "name": name,
|
|
"slide_type": slide_type, "channel_id": channel_id}
|
|
|
|
|
|
@mcp.tool()
|
|
def update_slide(slide_id: int, name: str = "", description: str = "",
|
|
url: str = "", html_content: str = "",
|
|
completion_time: float = None,
|
|
sequence: int = None) -> dict:
|
|
"""Update a slide's metadata, URL, or article HTML content.
|
|
Only provided fields are changed.
|
|
|
|
For 'webpage' articles: pass html_content to replace the article body.
|
|
For pdf/video slides: pass url to update the linked content.
|
|
|
|
When updating a URL for a Drive document, use Google Workspace MCP to get
|
|
the shareable link — do not ask the user to provide it unless the connector
|
|
is absent.
|
|
|
|
Trigger phrases: "update slide", "edit lesson", "update article",
|
|
"update slide content", "change video URL"."""
|
|
vals = {}
|
|
if name: vals["name"] = name
|
|
if description: vals["description"] = description
|
|
if url: vals["url"] = url
|
|
if html_content: vals["html_content"] = html_content
|
|
if completion_time is not None: vals["completion_time"] = completion_time
|
|
if sequence is not None: vals["sequence"] = sequence
|
|
if not vals:
|
|
return {"success": False, "reason": "No fields provided to update"}
|
|
_write("slide.slide", slide_id, vals)
|
|
return {"success": True, "slide_id": slide_id,
|
|
"fields_updated": list(vals.keys())}
|
|
|
|
|
|
@mcp.tool()
|
|
def publish_slide(slide_id: int, published: bool = True) -> dict:
|
|
"""Publish or unpublish an individual slide.
|
|
|
|
published=True → slide is visible to enrolled learners
|
|
published=False → slide is hidden (draft)
|
|
|
|
Trigger phrases: "publish slide", "unpublish slide", "hide lesson",
|
|
"make lesson visible", "publish lesson"."""
|
|
_write("slide.slide", slide_id, {
|
|
"is_published": published,
|
|
"website_published": published,
|
|
})
|
|
return {"success": True, "slide_id": slide_id,
|
|
"status": "published" if published else "unpublished"}
|
|
|
|
|
|
@mcp.tool()
|
|
def reorder_slides(channel_id: int, slide_sequences: dict) -> dict:
|
|
"""Reorder slides in a course by setting sequence numbers.
|
|
slide_sequences maps slide_id → sequence number (lower = earlier).
|
|
|
|
Example: reorder_slides(8, {"38": 1, "39": 2, "40": 3, "41": 4})
|
|
|
|
Trigger phrases: "reorder slides", "reorder lessons", "change slide order",
|
|
"move lesson", "rearrange course"."""
|
|
if not slide_sequences:
|
|
return {"success": False, "reason": "No sequences provided"}
|
|
updated, errors = [], []
|
|
for sid, seq in slide_sequences.items():
|
|
try:
|
|
_write("slide.slide", int(sid), {"sequence": int(seq)})
|
|
updated.append(int(sid))
|
|
except Exception as e:
|
|
errors.append({"slide_id": sid, "error": str(e)})
|
|
return {
|
|
"success": len(errors) == 0,
|
|
"channel_id": channel_id,
|
|
"updated_count": len(updated),
|
|
"updated_ids": updated,
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
# ── Quiz ──────────────────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
def get_quiz_questions(slide_id: int) -> list:
|
|
"""Get all quiz questions and answers for a slide.
|
|
Returns each question with its answer options and correct answer flags.
|
|
Trigger phrases: "quiz questions", "get questions", "show quiz", "what questions"."""
|
|
slide = _read("slide.slide", [slide_id], ["id", "name", "question_ids"])
|
|
if not slide or not slide[0].get("question_ids"):
|
|
return []
|
|
try:
|
|
questions = _read("slide.question", slide[0]["question_ids"],
|
|
["id", "question", "answer_ids", "sequence"])
|
|
for q in questions:
|
|
if q.get("answer_ids"):
|
|
q["answers"] = _read("slide.answer", q["answer_ids"],
|
|
["id", "text_value", "is_correct", "comment"])
|
|
return sorted(questions, key=lambda q: q.get("sequence", 0))
|
|
except Exception as e:
|
|
raise Exception(f"[get_quiz_questions] Failed: {e}")
|
|
|
|
|
|
@mcp.tool()
|
|
def add_quiz_question(slide_id: int, question: str,
|
|
answers: list, sequence: int = 0) -> dict:
|
|
"""Add one quiz question to a slide.
|
|
|
|
answers: list of dicts with:
|
|
'text' — answer option text (required)
|
|
'is_correct' — True/False; at least one answer must be correct
|
|
'comment' — optional feedback shown after answering
|
|
|
|
Example:
|
|
add_quiz_question(
|
|
slide_id=38,
|
|
question="What does RDMC stand for?",
|
|
answers=[
|
|
{"text": "Remote Display Management Console", "is_correct": True,
|
|
"comment": "Correct — RDMC manages the full installed display base."},
|
|
{"text": "Real-time Data Management Center", "is_correct": False},
|
|
{"text": "Remote Display Media Controller", "is_correct": False},
|
|
]
|
|
)
|
|
|
|
Trigger phrases: "add question", "add quiz question", "add quiz item"."""
|
|
q_vals = {"slide_id": slide_id, "question": question}
|
|
if sequence:
|
|
q_vals["sequence"] = sequence
|
|
question_id = _create("slide.question", q_vals)
|
|
answer_ids = []
|
|
for ans in answers:
|
|
a_vals = {
|
|
"question_id": question_id,
|
|
"text_value": ans["text"],
|
|
"is_correct": ans.get("is_correct", False),
|
|
}
|
|
if ans.get("comment"):
|
|
a_vals["comment"] = ans["comment"]
|
|
answer_ids.append(_create("slide.answer", a_vals))
|
|
return {
|
|
"question_id": question_id,
|
|
"slide_id": slide_id,
|
|
"question": question,
|
|
"answer_ids": answer_ids,
|
|
"answer_count": len(answer_ids),
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def generate_quiz(slide_id: int, questions: list) -> dict:
|
|
"""Write a batch of pre-generated quiz questions to a slide.
|
|
|
|
Intended workflow — Claude does the generation, this tool does the writing:
|
|
1. Call get_slide or list_course_slides to read the lesson content
|
|
2. Generate question/answer sets based on that content
|
|
3. Call this tool to write all questions in one operation
|
|
|
|
questions: list of dicts with:
|
|
'question' — question text (required)
|
|
'answers' — list of answer dicts (same format as add_quiz_question)
|
|
'sequence' — optional ordering (auto-increments from 1 if omitted)
|
|
|
|
Replaces any existing questions on the slide before writing.
|
|
Trigger phrases: "generate quiz", "create quiz from content",
|
|
"auto-generate questions", "build quiz"."""
|
|
# Clear existing questions
|
|
existing = _read("slide.slide", [slide_id], ["question_ids"])
|
|
if existing and existing[0].get("question_ids"):
|
|
try:
|
|
_call("slide.question", "unlink", [existing[0]["question_ids"]])
|
|
except Exception:
|
|
pass # non-fatal
|
|
created = []
|
|
for i, q_data in enumerate(questions):
|
|
result = add_quiz_question(
|
|
slide_id=slide_id,
|
|
question=q_data["question"],
|
|
answers=q_data["answers"],
|
|
sequence=q_data.get("sequence", i + 1),
|
|
)
|
|
created.append(result)
|
|
return {
|
|
"success": True,
|
|
"slide_id": slide_id,
|
|
"questions_created": len(created),
|
|
"question_ids": [c["question_id"] for c in created],
|
|
}
|
|
|
|
|
|
# ── Enrollment ────────────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
def get_course_enrollment(channel_id: int) -> list:
|
|
"""Get all enrollment records for a course — enrolled users,
|
|
completion percentage, and whether they have fully completed it.
|
|
Trigger phrases: "who's enrolled", "enrollment", "completion rate",
|
|
"course progress", "learner progress"."""
|
|
return _search_read("slide.channel.partner",
|
|
[["channel_id", "=", channel_id]],
|
|
["partner_id", "completion", "completed", "last_seen_slide_id"],
|
|
limit=500, order="completion desc")
|
|
|
|
|
|
@mcp.tool()
|
|
def enroll_in_course(channel_id: int, partner_ids: list = None,
|
|
emails: list = None,
|
|
send_invitation: bool = False,
|
|
enroll_group_names: list = None) -> dict:
|
|
"""Enroll one or more people in an eLearning course.
|
|
|
|
partner_ids : list of Odoo partner IDs to enroll directly
|
|
emails : list of email addresses — resolved to partner IDs automatically
|
|
send_invitation : True to send invitation email after enrolling
|
|
enroll_group_names: add Odoo group(s) to the course auto-enroll list
|
|
(e.g. ['Internal Users'] for all employees)
|
|
|
|
This tool is the standard onboarding hook — the project management plugin
|
|
or any other skill can call it to enroll new customers in courses automatically.
|
|
|
|
Trigger phrases: "enroll in course", "add to course", "give course access",
|
|
"onboard to course", "invite to training"."""
|
|
all_partner_ids = list(partner_ids or [])
|
|
not_found_emails = []
|
|
if emails:
|
|
found, not_found = _resolve_partner_ids(emails)
|
|
all_partner_ids.extend(found)
|
|
not_found_emails = not_found
|
|
# Check for existing enrollments to avoid duplicates
|
|
existing = _search_read("slide.channel.partner",
|
|
[["channel_id", "=", channel_id],
|
|
["partner_id", "in", all_partner_ids]],
|
|
["partner_id"], limit=1000) if all_partner_ids else []
|
|
already_enrolled = set()
|
|
for r in existing:
|
|
pid = r["partner_id"]
|
|
already_enrolled.add(pid[0] if isinstance(pid, (list, tuple)) else pid)
|
|
new_ids = [pid for pid in all_partner_ids if pid not in already_enrolled]
|
|
# Create enrollment records
|
|
created = []
|
|
for pid in new_ids:
|
|
try:
|
|
_create("slide.channel.partner", {
|
|
"channel_id": channel_id,
|
|
"partner_id": pid,
|
|
})
|
|
created.append(pid)
|
|
except Exception:
|
|
pass # may already exist due to race condition
|
|
# Add auto-enroll groups if requested
|
|
if enroll_group_names:
|
|
gids = _resolve_group_ids(enroll_group_names)
|
|
if gids:
|
|
_write("slide.channel", channel_id,
|
|
{"enroll_group_ids": [(4, gid) for gid in gids]})
|
|
# Send invitations
|
|
invitation_result = None
|
|
if send_invitation and created:
|
|
for method_name in ("action_send_share_mail", "_send_share_email"):
|
|
try:
|
|
_call("slide.channel", method_name,
|
|
[[channel_id]], {"partner_ids": created})
|
|
invitation_result = {"sent": True, "partner_ids": created}
|
|
break
|
|
except Exception as e:
|
|
invitation_result = {"sent": False, "error": str(e)}
|
|
result = {
|
|
"channel_id": channel_id,
|
|
"newly_enrolled": created,
|
|
"already_enrolled": list(already_enrolled),
|
|
"not_found_emails": not_found_emails,
|
|
}
|
|
if invitation_result:
|
|
result["invitations"] = invitation_result
|
|
return result
|
|
|
|
|
|
@mcp.tool()
|
|
def bulk_enroll(enrollments: list) -> dict:
|
|
"""Enroll multiple people across one or more courses in a single call.
|
|
Designed for CSV-style onboarding imports and automated PM → eLearning workflows.
|
|
|
|
enrollments: list of dicts, each with:
|
|
'email' or 'partner_id' — who to enroll (required)
|
|
'channel_ids' — list of course IDs (required)
|
|
'send_invitation' — optional bool, default False
|
|
|
|
Example:
|
|
bulk_enroll([
|
|
{"email": "ops@transitco.gov", "channel_ids": [8, 14], "send_invitation": True},
|
|
{"email": "admin@city.gov", "channel_ids": [8], "send_invitation": True},
|
|
{"partner_id": 4271, "channel_ids": [15]},
|
|
])
|
|
|
|
Returns a per-record summary with enrolled / already_enrolled / not_found flags.
|
|
Trigger phrases: "bulk enroll", "import enrollments", "enroll list",
|
|
"onboard customers", "add multiple people to course"."""
|
|
results = []
|
|
for entry in enrollments:
|
|
email = entry.get("email", "")
|
|
pid = entry.get("partner_id")
|
|
channel_ids = entry.get("channel_ids", [])
|
|
send_inv = entry.get("send_invitation", False)
|
|
if not channel_ids:
|
|
results.append({"entry": entry, "error": "No channel_ids provided"})
|
|
continue
|
|
for cid in channel_ids:
|
|
try:
|
|
r = enroll_in_course(
|
|
channel_id=cid,
|
|
partner_ids=[pid] if pid else None,
|
|
emails=[email] if email else None,
|
|
send_invitation=send_inv,
|
|
)
|
|
results.append({
|
|
"identity": email or str(pid),
|
|
"channel_id": cid,
|
|
"enrolled": len(r.get("newly_enrolled", [])) > 0,
|
|
"already_enrolled": len(r.get("already_enrolled", [])) > 0,
|
|
"not_found": len(r.get("not_found_emails", [])) > 0,
|
|
})
|
|
except Exception as e:
|
|
results.append({
|
|
"identity": email or str(pid),
|
|
"channel_id": cid,
|
|
"error": str(e),
|
|
})
|
|
succeeded = sum(1 for r in results
|
|
if r.get("enrolled") or r.get("already_enrolled"))
|
|
return {
|
|
"total": len(results),
|
|
"succeeded": succeeded,
|
|
"failed": len(results) - succeeded,
|
|
"results": results,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def send_course_invitation(channel_id: int, partner_ids: list) -> dict:
|
|
"""Send or resend course invitation emails to specific enrolled partners.
|
|
Trigger phrases: "send invitation", "resend invite",
|
|
"email course invite", "resend course email"."""
|
|
for method_name in ("action_send_share_mail", "_send_share_email"):
|
|
try:
|
|
_call("slide.channel", method_name,
|
|
[[channel_id]], {"partner_ids": partner_ids})
|
|
return {"success": True, "channel_id": channel_id,
|
|
"invited_partner_ids": partner_ids, "count": len(partner_ids)}
|
|
except Exception as e:
|
|
last_error = e
|
|
raise Exception(f"[send_course_invitation] All invitation methods failed: {last_error}")
|
|
|
|
|
|
# ── Media & Resources ─────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
def set_course_cover(channel_id: int, image_url: str) -> dict:
|
|
"""Set or replace the cover image for an eLearning course.
|
|
|
|
image_url: A direct download URL for the image.
|
|
Use Google Workspace MCP to get a download URL from Google Drive
|
|
when the connector is available — do not ask the user to provide
|
|
or construct the URL unless the connector is absent.
|
|
|
|
The image is fetched, base64-encoded, and written to Odoo.
|
|
Trigger phrases: "set course cover", "update cover image", "course thumbnail",
|
|
"change course image"."""
|
|
try:
|
|
_write("slide.channel", channel_id,
|
|
{"image_1920": _fetch_image_base64(image_url)})
|
|
return {"success": True, "channel_id": channel_id}
|
|
except Exception as e:
|
|
raise Exception(f"[set_course_cover] {e}")
|
|
|
|
|
|
@mcp.tool()
|
|
def set_slide_cover(slide_id: int, image_url: str) -> dict:
|
|
"""Set or replace the thumbnail/cover image for an individual slide.
|
|
|
|
image_url: A direct download URL for the image.
|
|
Use Google Workspace MCP for Drive images when available.
|
|
Do not ask the user to provide the URL unless the connector is absent.
|
|
|
|
Trigger phrases: "slide thumbnail", "slide cover image", "lesson image",
|
|
"set slide image"."""
|
|
try:
|
|
_write("slide.slide", slide_id,
|
|
{"image_1920": _fetch_image_base64(image_url)})
|
|
return {"success": True, "slide_id": slide_id}
|
|
except Exception as e:
|
|
raise Exception(f"[set_slide_cover] {e}")
|
|
|
|
|
|
@mcp.tool()
|
|
def link_drive_document(slide_id: int, drive_url: str) -> dict:
|
|
"""Set the URL for a document, video, or infographic slide.
|
|
|
|
Use the Google Workspace MCP connector to get the shareable link for
|
|
a Drive file when available. Do not ask the user to construct or copy
|
|
the URL unless the connector is not connected.
|
|
|
|
drive_url: Share link or direct viewer URL for the content.
|
|
For PDFs: use the Drive file share link.
|
|
For videos: use the YouTube or Vimeo watch URL.
|
|
|
|
Trigger phrases: "link document", "set PDF URL", "attach Google Drive",
|
|
"link Drive file", "update video URL", "set slide URL"."""
|
|
_write("slide.slide", slide_id, {"url": drive_url})
|
|
return {"success": True, "slide_id": slide_id, "url": drive_url}
|
|
|
|
|
|
@mcp.tool()
|
|
def add_course_resource(channel_id: int, name: str,
|
|
resource_url: str) -> dict:
|
|
"""Add a downloadable resource or external link to a course's resource section.
|
|
Resources appear as attachments/links on the course landing page.
|
|
|
|
resource_url: URL of the resource.
|
|
Use Google Workspace MCP to get Drive share links when available.
|
|
Do not ask the user to provide the URL unless the connector is absent.
|
|
|
|
Trigger phrases: "add resource", "attach file to course", "course download",
|
|
"add course attachment", "add course link"."""
|
|
try:
|
|
resource_id = _create("slide.channel.resource", {
|
|
"channel_id": channel_id,
|
|
"name": name,
|
|
"resource_type": "url",
|
|
"link": resource_url,
|
|
})
|
|
return {"success": True, "resource_id": resource_id,
|
|
"channel_id": channel_id, "name": name, "url": resource_url}
|
|
except Exception as e:
|
|
# Fallback: add as a published slide if resource model unavailable
|
|
try:
|
|
sid = _create("slide.slide", {
|
|
"channel_id": channel_id,
|
|
"name": name,
|
|
"slide_type": "pdf",
|
|
"url": resource_url,
|
|
"is_published": True,
|
|
"website_published": True,
|
|
})
|
|
return {"success": True, "fallback": "created_as_slide",
|
|
"slide_id": sid, "channel_id": channel_id,
|
|
"name": name, "url": resource_url}
|
|
except Exception as e2:
|
|
raise Exception(
|
|
f"[add_course_resource] Failed: primary={e}, fallback={e2}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run()
|