#!/usr/bin/env python3 """ Odoo MCP Server for MPM — v0.3.0 Connects to mpmedia.odoo.com via XML-RPC and exposes tools for Products, Knowledge, Contacts, Sales, CRM, Project, Helpdesk, Purchase, Inventory, Employees, Knowledge Templates, and eLearning. """ import os import re import base64 import xmlrpc.client import urllib.request import urllib.error from typing import Optional from mcp.server.fastmcp import FastMCP # ── Configuration ──────────────────────────────────────────────────────────── ODOO_URL = os.environ.get("ODOO_URL", "https://mpmedia.odoo.com") ODOO_DB = os.environ.get("ODOO_DB", "mpmedia-odoo-sh-main-13285275") ODOO_USERNAME = os.environ.get("ODOO_USERNAME", "") # per-user: set via Keychain KEYCHAIN_SERVICE = "odoo-mpm" # credential store service name (all platforms) ODOO_API_KEY = os.environ.get("ODOO_API_KEY", "") # ── Proxy-aware XML-RPC transport ───────────────────────────────────────────── class ProxyAwareTransport(xmlrpc.client.SafeTransport): """Routes xmlrpc through the system HTTPS proxy (respects HTTPS_PROXY env var).""" def request(self, host, handler, request_body, verbose=False): url = f"https://{host}{handler}" headers = { "Content-Type": "text/xml", "Accept-Encoding": "identity", "User-Agent": "xmlrpc-odoo-mpm/1.0", } req = urllib.request.Request(url, request_body, headers) opener = urllib.request.build_opener(urllib.request.ProxyHandler()) try: with opener.open(req, timeout=30) as resp: self.verbose = verbose return self.parse_response(resp) except urllib.error.HTTPError as e: raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers)) except urllib.error.URLError as e: raise xmlrpc.client.ProtocolError(url, 0, str(e.reason), {}) _proxy_transport = ProxyAwareTransport() # ── Odoo client ─────────────────────────────────────────────────────────────── _uid: Optional[int] = None _models = None _resolved_api_key: Optional[str] = None def _keychain_get(account: str) -> str: try: import keyring value = keyring.get_password(KEYCHAIN_SERVICE, account) return value or "" except Exception: return "" def _keychain_set(account: str, value: str) -> None: import keyring keyring.set_password(KEYCHAIN_SERVICE, account, value) def _keychain_delete(account: str) -> bool: try: import keyring existing = keyring.get_password(KEYCHAIN_SERVICE, account) if existing is not None: keyring.delete_password(KEYCHAIN_SERVICE, account) return True return False except Exception: return False def _get_credentials() -> tuple[str, str]: username = ODOO_USERNAME or _keychain_get("odoo_username") api_key = ODOO_API_KEY or _keychain_get("odoo_api_key") return username, api_key def _connect(): global _uid, _models, _resolved_api_key if _uid is not None: return username, api_key = _get_credentials() if not username or not api_key: raise RuntimeError( "Odoo credentials not configured. " "Run the setup_odoo_credentials tool with your Odoo login email and API key. " "See the Odoo skill instructions for how to generate your personal API key." ) common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common", transport=_proxy_transport) _uid = common.authenticate(ODOO_DB, username, api_key, {}) if not _uid: raise RuntimeError( "Odoo authentication failed. " "Verify your username and API key, then run setup_odoo_credentials again." ) _resolved_api_key = api_key _models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object", transport=_proxy_transport) def _call(model: str, method: str, args=None, kwargs=None): _connect() return _models.execute_kw( ODOO_DB, _uid, _resolved_api_key, model, method, args or [[]], kwargs or {} ) def _search_read(model, domain=None, fields=None, limit=20, offset=0, order=None): kw = {"limit": limit, "offset": offset} if fields: kw["fields"] = fields if order: kw["order"] = order return _call(model, "search_read", [domain or []], kw) def _read(model, ids, fields=None): kw = {} if fields: kw["fields"] = fields return _call(model, "read", [ids], kw) def _create(model, vals): return _call(model, "create", [vals]) def _write(model, ids, vals): return _call(model, "write", [[ids] if isinstance(ids, int) else ids, vals]) def _fetch_image_base64(url: str) -> str: """Fetch an image from a URL and return a base64-encoded string.""" try: req = urllib.request.Request(url, headers={"User-Agent": "odoo-mpm/1.0"}) with urllib.request.urlopen(req, timeout=15) as resp: return base64.b64encode(resp.read()).decode("utf-8") except Exception as e: raise Exception(f"[image fetch] Could not fetch image from {url}: {e}") def _resolve_partner_ids(emails: list) -> tuple: """Resolve a list of email addresses to partner IDs. Returns (found_ids, not_found_emails).""" if not emails: return [], [] lower_emails = [e.lower() for e in emails] results = _search_read("res.partner", [["email", "in", lower_emails]], ["id", "email"], limit=len(emails) + 10) found_map = {r["email"].lower(): r["id"] for r in results if r.get("email")} found_ids = [found_map[e] for e in lower_emails if e in found_map] not_found = [e for e in lower_emails if e not in found_map] return found_ids, not_found def _resolve_group_ids(group_names: list) -> list: """Resolve Odoo group names to IDs (case-insensitive exact match).""" if not group_names: return [] group_ids = [] for gname in group_names: groups = _search_read("res.groups", [["name", "ilike", gname]], ["id", "name"], limit=10) exact = [g for g in groups if g["name"].strip().lower() == gname.strip().lower()] if exact: group_ids.append(exact[0]["id"]) elif groups: group_ids.append(groups[0]["id"]) return group_ids # ── FastMCP App ─────────────────────────────────────────────────────────────── mcp = FastMCP("Odoo MPM") # ════════════════════════════════════════════════════════════════════════════ # CREDENTIALS SETUP # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def setup_odoo_credentials(username: str, api_key: str) -> str: """Store your personal Odoo credentials securely in the system keystore. This only needs to be done once per machine (macOS Keychain, Windows Credential Manager, or Linux Secret Service). username : your Odoo login email (e.g. you@mpmedia.tv) api_key : your personal Odoo API key — generate it in Odoo at: My Profile → Account Security → API Keys → New API Key Set the expiration to "No Limit" (indefinite). Never share this key or use a colleague's key. Credentials are stored in the OS keystore under the service 'odoo-mpm' and are never written to any file on disk.""" _keychain_set("odoo_username", username.strip()) _keychain_set("odoo_api_key", api_key.strip()) global _uid, _models, _resolved_api_key _uid = None _models = None _resolved_api_key = None try: _connect() return ( f"Credentials saved and verified. " f"Connected to {ODOO_URL} as UID {_uid}. " f"You're all set — Odoo tools are ready to use." ) except Exception as e: return f"Credentials saved to Keychain but authentication failed: {e}" @mcp.tool() def clear_odoo_credentials() -> str: """Remove your stored Odoo credentials from the system keystore. Use this if you are offboarding, rotating your API key, or troubleshooting an authentication problem. You will need to run setup_odoo_credentials again before using any Odoo tools.""" removed = [] for key in ("odoo_username", "odoo_api_key"): if _keychain_delete(key): removed.append(key) global _uid, _models, _resolved_api_key _uid = None _models = None _resolved_api_key = None if removed: return f"Removed {', '.join(removed)} from system keystore. Run setup_odoo_credentials to reconfigure." return "No stored credentials found in system keystore." # ════════════════════════════════════════════════════════════════════════════ # PRODUCTS # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_products(query: str = "", limit: int = 20, product_type: str = "") -> list: """Search products/product templates by name or internal reference. product_type can be: 'consu' (consumable), 'service', 'product' (storable). Returns id, name, default_code, type, list_price, categ_id.""" domain = [] if query: domain.append(["name", "ilike", query]) if product_type: domain.append(["type", "=", product_type]) return _search_read("product.template", domain, ["id", "name", "default_code", "type", "list_price", "categ_id", "active"], limit=limit, order="name asc") @mcp.tool() def get_product(product_id: int) -> dict: """Get full details of a product template by ID, including description, variants, pricing, category, supplier info, and stock info.""" r = _read("product.template", [product_id], ["id", "name", "default_code", "type", "list_price", "standard_price", "categ_id", "description", "description_sale", "description_purchase", "uom_id", "uom_po_id", "active", "sale_ok", "purchase_ok", "product_variant_count", "barcode"]) return r[0] if r else {} @mcp.tool() def get_product_stock(product_id: int) -> list: """Get current stock quantities for a product (by product.template ID) across all internal locations.""" variants = _search_read("product.product", [["product_tmpl_id", "=", product_id]], ["id", "display_name"]) if not variants: return [] variant_ids = [v["id"] for v in variants] return _search_read("stock.quant", [["product_id", "in", variant_ids], ["location_id.usage", "=", "internal"]], ["product_id", "location_id", "quantity", "reserved_quantity"], limit=50) # ════════════════════════════════════════════════════════════════════════════ # KNOWLEDGE # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_knowledge_articles(query: str = "", limit: int = 20) -> list: """Search Knowledge base articles by title. Returns id, name, parent article.""" domain = [["name", "ilike", query]] if query else [] return _search_read("knowledge.article", domain, ["id", "name", "parent_id", "is_published", "write_date"], limit=limit, order="write_date desc") @mcp.tool() def get_knowledge_article(article_id: int) -> dict: """Get the full content of a Knowledge article by ID, including its icon emoji. Note: Inline base64 images are replaced with [embedded image] placeholders to keep the response size manageable.""" r = _read("knowledge.article", [article_id], ["id", "name", "icon", "body", "parent_id", "child_ids", "is_published", "write_date", "write_uid"]) if not r: return {} article = r[0] if article.get("body"): article["body"] = re.sub( r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', 'src="[embedded image]"', article["body"] ) return article @mcp.tool() def create_knowledge_article(name: str, body: str, parent_id: int = None, icon: str = "") -> int: """Create a new Knowledge article. body is HTML. Returns new article ID. icon: optional Unicode emoji to display as the article icon in the KB tree (e.g. '⚙️', '🔌', '🖥️'). Leave empty to use Odoo's default. Browse available emojis at: https://emojipedia.org""" vals = {"name": name, "body": body} if parent_id: vals["parent_id"] = parent_id if icon: vals["icon"] = icon return _create("knowledge.article", vals) @mcp.tool() def update_knowledge_article(article_id: int, name: str = "", body: str = "", icon: str = "") -> bool: """Update a Knowledge article's title, body (HTML), and/or icon emoji. icon: Unicode emoji to set as the article icon in the KB tree (e.g. '⚙️', '🔌', '🖥️'). Empty string = no change (preserves existing icon). Browse available emojis at: https://emojipedia.org""" vals = {} if name: vals["name"] = name if body: vals["body"] = body if icon: vals["icon"] = icon if not vals: return False return _write("knowledge.article", article_id, vals) @mcp.tool() def search_knowledge_templates(query: str = "", category: str = "", limit: int = 50) -> list: """Search Knowledge Base article templates. Optionally filter by template name or category name (e.g. 'Productivity', 'Sales', 'Marketing', 'Company Organization', 'Product Management'). Returns id, template_name, template_description, category, and sequence.""" domain = [["is_template", "=", True]] if query: domain.append(["template_name", "ilike", query]) if category: domain.append(["template_category_id.name", "ilike", category]) return _search_read("knowledge.article", domain, ["id", "name", "template_name", "template_description", "template_category_id", "template_sequence", "is_published"], limit=limit, order="template_category_sequence asc, template_sequence asc") @mcp.tool() def get_knowledge_template(template_id: int) -> dict: """Get full details of a Knowledge Base article template by ID, including the template body content. Inline base64 images are replaced with [embedded image] placeholders.""" r = _read("knowledge.article", [template_id], ["id", "name", "template_name", "template_description", "template_body", "template_preview", "template_category_id", "template_sequence", "is_published"]) if not r: return {} template = r[0] if template.get("template_body"): template["template_body"] = re.sub( r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', 'src="[embedded image]"', template["template_body"] ) if template.get("template_preview"): template["template_preview"] = re.sub( r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', 'src="[embedded image]"', template["template_preview"] ) return template @mcp.tool() def list_knowledge_template_categories() -> list: """List all Knowledge Base article template categories with their IDs and names.""" return _search_read("knowledge.article.template.category", [], ["id", "name", "sequence"], limit=50, order="sequence asc") # ════════════════════════════════════════════════════════════════════════════ # CONTACTS # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_contacts(query: str = "", is_company: bool = None, limit: int = 20) -> list: """Search contacts/partners by name, email, or phone. Set is_company=True for companies only, False for individuals.""" domain = [] if query: domain.append("|") domain.append(["name", "ilike", query]) domain.append("|") domain.append(["email", "ilike", query]) domain.append(["phone", "ilike", query]) if is_company is not None: domain.append(["is_company", "=", is_company]) return _search_read("res.partner", domain, ["id", "name", "email", "phone", "is_company", "street", "city", "state_id", "country_id", "website"], limit=limit, order="name asc") @mcp.tool() def get_contact(contact_id: int) -> dict: """Get full details of a contact/partner by ID.""" r = _read("res.partner", [contact_id], ["id", "name", "email", "phone", "mobile", "is_company", "parent_id", "street", "street2", "city", "state_id", "zip", "country_id", "website", "comment", "category_id", "child_ids", "user_id"]) return r[0] if r else {} @mcp.tool() def create_contact(name: str, email: str = "", phone: str = "", is_company: bool = False, parent_id: int = None, street: str = "", city: str = "") -> int: """Create a new contact. Returns the new contact ID.""" vals = {"name": name, "is_company": is_company} if email: vals["email"] = email if phone: vals["phone"] = phone if parent_id: vals["parent_id"] = parent_id if street: vals["street"] = street if city: vals["city"] = city return _create("res.partner", vals) # ════════════════════════════════════════════════════════════════════════════ # SALES # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_sales_orders(query: str = "", state: str = "", limit: int = 20) -> list: """Search sales orders by name or customer. State options: draft, sent, sale, done, cancel.""" domain = [] if query: domain.append("|") domain.append(["name", "ilike", query]) domain.append(["partner_id.name", "ilike", query]) if state: domain.append(["state", "=", state]) return _search_read("sale.order", domain, ["id", "name", "partner_id", "state", "amount_total", "date_order", "user_id", "validity_date"], limit=limit, order="date_order desc") @mcp.tool() def get_sales_order(order_id: int) -> dict: """Get full details of a sales order including line items.""" r = _read("sale.order", [order_id], ["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax", "amount_total", "date_order", "user_id", "note", "order_line", "validity_date", "payment_term_id", "commitment_date"]) if not r: return {} order = r[0] if order.get("order_line"): lines = _read("sale.order.line", order["order_line"], ["id", "product_id", "name", "product_uom_qty", "price_unit", "price_subtotal", "qty_delivered", "qty_invoiced"]) order["lines"] = lines return order @mcp.tool() def create_sales_order(partner_id: int, lines: list) -> int: """Create a sales order. lines is a list of dicts with keys: product_id (int), product_uom_qty (float), price_unit (float). Returns new order ID.""" order_lines = [] for l in lines: order_lines.append((0, 0, { "product_id": l["product_id"], "product_uom_qty": l.get("product_uom_qty", 1), "price_unit": l.get("price_unit", 0), })) return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines}) @mcp.tool() def cancel_sale_orders(order_ids: list) -> dict: """Cancel one or more quotations. ONLY operates on orders in Quotation (draft) or Quotation Sent (sent) state — will raise an error if any provided IDs are in any other state (Sale Order, Locked, Cancelled, etc.). This tool intentionally cannot cancel confirmed sale orders. Trigger phrases: "cancel quotation", "cancel quote".""" if not order_ids: return {"success": True, "cancelled_ids": [], "count": 0} # Pre-validate: only draft/sent allowed — refuse everything else bad_orders = _search_read( "sale.order", [["id", "in", order_ids], ["state", "not in", ["draft", "sent"]]], ["id", "name", "state"] ) if bad_orders: bad_desc = [f"{o['name']} (id={o['id']}, state={o['state']})" for o in bad_orders] raise Exception( f"[cancel_sale_orders] Refused — only Quotation (draft) or Quotation Sent (sent) " f"orders may be cancelled via this tool. The following IDs are in other states: " f"{', '.join(bad_desc)}" ) try: _call("sale.order", "action_cancel", [order_ids]) return {"success": True, "cancelled_ids": order_ids, "count": len(order_ids)} except Exception as e: raise Exception(f"[cancel_sale_orders] Failed: {e}") @mcp.tool() def cancel_and_archive_quotations(order_ids: list) -> dict: """Cancel then immediately archive sale order quotations in one call. All IDs must be in draft or sent state. Cancel is validated before archive is attempted — if cancel fails, archive is not executed. Trigger phrases: "cancel and archive", "remove quotation", "clean up quotes".""" if not order_ids: return {"success": True, "order_ids": [], "count": 0, "steps_completed": []} # Pre-validate: check for any orders NOT in draft/sent bad_orders = _search_read( "sale.order", [["id", "in", order_ids], ["state", "not in", ["draft", "sent"]]], ["id", "name", "state"] ) if bad_orders: bad_desc = [f"{o['name']} (id={o['id']}, state={o['state']})" for o in bad_orders] raise Exception( f"[cancel_and_archive_quotations] Cannot proceed — orders not in draft/sent state: " f"{', '.join(bad_desc)}" ) try: _call("sale.order", "action_cancel", [order_ids]) except Exception as e: raise Exception(f"[cancel_and_archive_quotations] Cancel step failed: {e}") try: _write("sale.order", order_ids, {"active": False}) except Exception as e: raise Exception( f"[cancel_and_archive_quotations] Archive step failed (orders were cancelled): {e}" ) return { "success": True, "order_ids": order_ids, "count": len(order_ids), "steps_completed": ["cancelled", "archived"] } # ════════════════════════════════════════════════════════════════════════════ # CRM # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_crm_leads(query: str = "", stage: str = "", assigned_to_me: bool = False, limit: int = 20) -> list: """Search CRM leads/opportunities. Filter by name, stage name, or assigned to current user.""" domain = [["type", "=", "opportunity"]] if query: domain.append("|") domain.append(["name", "ilike", query]) domain.append(["partner_id.name", "ilike", query]) if stage: domain.append(["stage_id.name", "ilike", stage]) return _search_read("crm.lead", domain, ["id", "name", "partner_id", "stage_id", "expected_revenue", "probability", "user_id", "date_deadline", "priority"], limit=limit, order="date_deadline asc") @mcp.tool() def get_crm_lead(lead_id: int) -> dict: """Get full details of a CRM lead/opportunity by ID.""" r = _read("crm.lead", [lead_id], ["id", "name", "type", "partner_id", "stage_id", "user_id", "expected_revenue", "probability", "date_deadline", "priority", "description", "phone", "email_from", "tag_ids", "activity_ids", "date_conversion"]) return r[0] if r else {} @mcp.tool() def create_crm_lead(name: str, partner_id: int = None, expected_revenue: float = 0, description: str = "", email: str = "", phone: str = "") -> int: """Create a new CRM opportunity. Returns new lead ID.""" vals = {"name": name, "type": "opportunity", "expected_revenue": expected_revenue} if partner_id: vals["partner_id"] = partner_id if description: vals["description"] = description if email: vals["email_from"] = email if phone: vals["phone"] = phone return _create("crm.lead", vals) @mcp.tool() def update_crm_lead(lead_id: int, stage_id: int = None, probability: float = None, expected_revenue: float = None, note: str = "") -> bool: """Update a CRM lead's stage, probability, revenue, or notes.""" vals = {} if stage_id is not None: vals["stage_id"] = stage_id if probability is not None: vals["probability"] = probability if expected_revenue is not None: vals["expected_revenue"] = expected_revenue if note: vals["description"] = note return _write("crm.lead", lead_id, vals) if vals else False @mcp.tool() def list_crm_stages() -> list: """List all CRM pipeline stages with their IDs and names.""" return _search_read("crm.stage", [], ["id", "name", "sequence"], limit=50, order="sequence asc") @mcp.tool() def mark_crm_lead_lost(lead_id: int, lost_reason_id: int = None) -> dict: """Mark a CRM lead/opportunity as Lost. Sets probability=0, deactivates the record, and records the loss reason. Use list_crm_lost_reasons() to get valid reason IDs. Trigger phrases: "mark lost", "set as lost", "lose the opportunity".""" vals: dict = {"active": False, "probability": 0} if lost_reason_id is not None: vals["lost_reason_id"] = lost_reason_id try: _write("crm.lead", [lead_id], vals) return { "success": True, "lead_id": lead_id, "lost_reason_id": lost_reason_id, "action": "marked_lost" } except Exception as e: raise Exception(f"[mark_crm_lead_lost] Failed for lead {lead_id}: {e}") @mcp.tool() def list_crm_lost_reasons() -> list: """List all available CRM lost reasons with IDs and names. Call this before mark_crm_lead_lost so the user can pick a reason by name. Trigger phrases: "lost reasons", "why lost", "reason for losing".""" try: return _search_read("crm.lost.reason", [], ["id", "name"], limit=100, order="name asc") except Exception as e: raise Exception(f"[list_crm_lost_reasons] Failed: {e}") # ════════════════════════════════════════════════════════════════════════════ # PROJECT # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def list_projects(limit: int = 50) -> list: """List all active projects with id, name, description, and task count.""" return _search_read("project.project", [["active", "=", True]], ["id", "name", "description", "task_count", "user_id", "date_start", "date"], limit=limit, order="name asc") @mcp.tool() def get_project(project_id: int) -> dict: """Get full details of a project by ID.""" r = _read("project.project", [project_id], ["id", "name", "description", "user_id", "task_count", "date_start", "date", "tag_ids", "privacy_visibility"]) return r[0] if r else {} @mcp.tool() def search_tasks(query: str = "", project_id: int = None, stage: str = "", limit: int = 30) -> list: """Search project tasks by title, project, or stage name.""" domain = [] if query: domain.append(["name", "ilike", query]) if project_id: domain.append(["project_id", "=", project_id]) if stage: domain.append(["stage_id.name", "ilike", stage]) return _search_read("project.task", domain, ["id", "name", "project_id", "stage_id", "user_ids", "date_deadline", "priority", "kanban_state", "tag_ids"], limit=limit, order="date_deadline asc") @mcp.tool() def get_task(task_id: int) -> dict: """Get full details of a project task by ID including description and subtasks.""" r = _read("project.task", [task_id], ["id", "name", "project_id", "stage_id", "user_ids", "description", "date_deadline", "priority", "kanban_state", "tag_ids", "child_ids", "depend_on_ids", "planned_hours", "effective_hours"]) return r[0] if r else {} @mcp.tool() def create_task(name: str, project_id: int, description: str = "", date_deadline: str = "", user_ids: list = None) -> int: """Create a project task. date_deadline format: YYYY-MM-DD. Returns task ID.""" vals = {"name": name, "project_id": project_id} if description: vals["description"] = description if date_deadline: vals["date_deadline"] = date_deadline if user_ids: vals["user_ids"] = [(6, 0, user_ids)] return _create("project.task", vals) @mcp.tool() def update_task(task_id: int, name: str = "", stage_id: int = None, description: str = "", date_deadline: str = "", kanban_state: str = "") -> bool: """Update a task. kanban_state: normal, done, blocked.""" vals = {} if name: vals["name"] = name if stage_id: vals["stage_id"] = stage_id if description: vals["description"] = description if date_deadline: vals["date_deadline"] = date_deadline if kanban_state: vals["kanban_state"] = kanban_state return _write("project.task", task_id, vals) if vals else False @mcp.tool() def list_task_stages(project_id: int = None) -> list: """List task stages. Optionally filter by project.""" domain = [] if project_id: domain.append(["project_ids", "in", [project_id]]) return _search_read("project.task.type", domain, ["id", "name", "sequence"], limit=50, order="sequence asc") # ── Project CRUD ────────────────────────────────────────────────────────────── @mcp.tool() def create_project(name: str, description: str = "", user_id: int = None, date_start: str = "", date: str = "", privacy_visibility: str = "employees", parent_id: int = None) -> int: """Create a new project. Returns the new project ID. parent_id: ID of the parent project.project — use to create sub-projects within a programme. privacy_visibility: 'employees' (all employees), 'portal' (employees + invited portal users), 'followers' (invited internal users only). Trigger phrases: "create project", "new project", "add a project", "start a project", "create sub-project", "create child project".""" vals: dict = {"name": name} if description: vals["description"] = description if user_id: vals["user_id"] = user_id if date_start: vals["date_start"] = date_start if date: vals["date"] = date if parent_id: vals["parent_id"] = parent_id vals["privacy_visibility"] = privacy_visibility or "employees" return _create("project.project", vals) @mcp.tool() def update_project(project_id: int, name: str = "", description: str = "", user_id: int = None, date_start: str = "", date: str = "", privacy_visibility: str = "") -> bool: """Update an existing project's metadata. privacy_visibility: 'employees', 'portal', 'followers'. Trigger phrases: "update project", "rename project", "change project owner", "set project deadline", "edit project".""" vals: dict = {} if name: vals["name"] = name if description: vals["description"] = description if user_id: vals["user_id"] = user_id if date_start: vals["date_start"] = date_start if date: vals["date"] = date if privacy_visibility: vals["privacy_visibility"] = privacy_visibility return _write("project.project", project_id, vals) if vals else False @mcp.tool() def archive_project(project_id: int, archive: bool = True) -> bool: """Archive or unarchive a project (soft delete). archive=True to archive, False to restore. Trigger phrases: "archive project", "close project", "deactivate project", "restore project", "reopen project".""" return _write("project.project", project_id, {"active": not archive}) # ── Milestones ──────────────────────────────────────────────────────────────── @mcp.tool() def list_milestones(project_id: int, include_reached: bool = False) -> list: """List milestones for a project. Set include_reached=True to also include already-completed milestones. Trigger phrases: "list milestones", "project milestones", "show milestones", "what milestones exist".""" domain = [["project_id", "=", project_id]] if not include_reached: domain.append(["is_reached", "=", False]) return _search_read("project.milestone", domain, ["id", "name", "project_id", "deadline", "is_reached", "reached_date"], limit=100, order="deadline asc") @mcp.tool() def create_milestone(project_id: int, name: str, deadline: str = "") -> int: """Create a milestone for a project. deadline format: YYYY-MM-DD. Returns milestone ID. Trigger phrases: "create milestone", "add milestone", "new milestone".""" vals: dict = {"project_id": project_id, "name": name} if deadline: vals["deadline"] = deadline return _create("project.milestone", vals) @mcp.tool() def update_milestone(milestone_id: int, name: str = "", deadline: str = "", is_reached: bool = None) -> bool: """Update a milestone's name, deadline, or completion status. Set is_reached=True to mark it complete, False to reopen it. Trigger phrases: "update milestone", "mark milestone complete", "change milestone deadline", "close milestone".""" vals: dict = {} if name: vals["name"] = name if deadline: vals["deadline"] = deadline if is_reached is not None: vals["is_reached"] = is_reached return _write("project.milestone", milestone_id, vals) if vals else False # ── Task chatter / notes ────────────────────────────────────────────────────── @mcp.tool() def post_task_message(task_id: int, body: str, message_type: str = "comment") -> int: """Post a message or internal note to a task's chatter thread. message_type: 'comment' (visible to all followers) or 'internal' (internal note only). Returns the new mail.message ID. Trigger phrases: "post a note", "log a message", "add a comment to the task", "internal note on task", "update task thread", "leave a comment".""" subtype_xmlid = "mail.mt_note" if message_type == "internal" else "mail.mt_comment" result = _call("project.task", "message_post", [[task_id]], { "body": body, "message_type": "comment", "subtype_xmlid": subtype_xmlid, }) if isinstance(result, int): return result if isinstance(result, dict): return result.get("id", 0) return 0 @mcp.tool() def get_task_messages(task_id: int, limit: int = 20) -> list: """Get the chatter message history for a task (most recent first). Trigger phrases: "task messages", "task history", "chatter", "task notes", "task comments", "what was said on this task".""" return _search_read("mail.message", [["model", "=", "project.task"], ["res_id", "=", task_id], ["message_type", "in", ["comment", "email"]]], ["id", "author_id", "date", "body", "subtype_id", "message_type"], limit=limit, order="date desc") # ── Subtasks ────────────────────────────────────────────────────────────────── @mcp.tool() def get_task_subtasks(task_id: int) -> list: """Get all direct subtasks (child tasks) of a task. Trigger phrases: "subtasks", "child tasks", "get subtasks", "list subtasks", "what are the subtasks".""" return _search_read("project.task", [["parent_id", "=", task_id]], ["id", "name", "stage_id", "user_ids", "date_deadline", "priority", "kanban_state", "active"], limit=100, order="sequence asc") @mcp.tool() def create_subtask(name: str, parent_task_id: int, project_id: int = None, description: str = "", date_deadline: str = "", user_ids: list = None) -> int: """Create a subtask linked to a parent task. Inherits the parent's project if project_id is omitted. Returns the new subtask ID. Trigger phrases: "create subtask", "add subtask", "new subtask", "child task".""" vals: dict = {"name": name, "parent_id": parent_task_id} if project_id: vals["project_id"] = project_id if description: vals["description"] = description if date_deadline: vals["date_deadline"] = date_deadline if user_ids: vals["user_ids"] = [(6, 0, user_ids)] return _create("project.task", vals) # ── Archive task ────────────────────────────────────────────────────────────── @mcp.tool() def archive_task(task_id: int, archive: bool = True) -> bool: """Archive or unarchive a task (soft delete). archive=True to archive, False to restore. Trigger phrases: "archive task", "close task", "remove task", "restore task", "deactivate task", "mark task done and archive".""" return _write("project.task", task_id, {"active": not archive}) # ── Followers ───────────────────────────────────────────────────────────────── @mcp.tool() def add_task_follower(task_id: int, partner_ids: list = None, user_ids: list = None) -> bool: """Subscribe followers to a task so they receive update notifications. Pass partner_ids (res.partner record IDs) or user_ids (res.users record IDs). Trigger phrases: "add follower", "subscribe to task", "watch task", "notify user on task", "follow this task".""" if user_ids and not partner_ids: users = _search_read("res.users", [["id", "in", user_ids]], ["partner_id"], limit=len(user_ids) + 5) partner_ids = [u["partner_id"][0] for u in users if u.get("partner_id")] if not partner_ids: return False _call("project.task", "message_subscribe", [[task_id]], {"partner_ids": partner_ids}) return True @mcp.tool() def remove_task_follower(task_id: int, partner_ids: list = None, user_ids: list = None) -> bool: """Unsubscribe followers from a task. Pass partner_ids (res.partner record IDs) or user_ids (res.users record IDs). Trigger phrases: "remove follower", "unsubscribe from task", "stop following task", "mute task notifications".""" if user_ids and not partner_ids: users = _search_read("res.users", [["id", "in", user_ids]], ["partner_id"], limit=len(user_ids) + 5) partner_ids = [u["partner_id"][0] for u in users if u.get("partner_id")] if not partner_ids: return False _call("project.task", "message_unsubscribe", [[task_id]], {"partner_ids": partner_ids}) return True # ── Stage management ────────────────────────────────────────────────────────── @mcp.tool() def create_task_stage(name: str, project_ids: list, sequence: int = 10, description: str = "") -> int: """Create a new task stage and link it to one or more projects. project_ids: list of project.project IDs to attach this stage to. Returns the new stage ID. Trigger phrases: "create stage", "add stage", "new task stage", "add column", "add kanban column".""" vals: dict = { "name": name, "project_ids": [(6, 0, project_ids)], "sequence": sequence, } if description: vals["description"] = description return _create("project.task.type", vals) @mcp.tool() def update_task_stage(stage_id: int, name: str = "", sequence: int = None, add_project_ids: list = None, remove_project_ids: list = None) -> bool: """Update a task stage's name, sequence order, or project associations. add_project_ids: project IDs to link this stage to (appends, does not replace). remove_project_ids: project IDs to detach this stage from. Trigger phrases: "rename stage", "reorder stage", "update stage", "move stage", "change stage order".""" vals: dict = {} if name: vals["name"] = name if sequence is not None: vals["sequence"] = sequence commands = [] if add_project_ids: for pid in add_project_ids: commands.append((4, pid)) # link without replacing others if remove_project_ids: for pid in remove_project_ids: commands.append((3, pid)) # unlink without deleting if commands: vals["project_ids"] = commands return _write("project.task.type", stage_id, vals) if vals else False # ── Link Drive document to project ─────────────────────────────────────────── @mcp.tool() def link_drive_document_to_project(project_id: int, drive_url: str, name: str = "Google Drive Folder") -> int: """Attach a Google Drive folder or document URL to a project.project record as an ir.attachment of type 'url'. The link appears in the project's Attachments panel in Odoo. Returns the new attachment ID. Trigger phrases: "link drive folder to project", "attach drive to project", "add drive link to project", "connect drive folder".""" return _create("ir.attachment", { "name": name, "type": "url", "url": drive_url, "res_model": "project.project", "res_id": project_id, }) # ── Task Checklists (projects_task_checklists — Cybrosys) ───────────────────── # Models: task.checklist (templates), checklist.item (template items), # checklist.item.line (per-task instances). State: todo/in_progress/done/cancel. # project.task extended with: checklist_id (Many2one), checklists_ids (One2many via projects_id). @mcp.tool() def list_checklist_templates(query: str = "", limit: int = 30) -> list: """List available task.checklist templates with their items. Trigger phrases: "checklist templates", "list checklists", "available checklists", "show checklist templates".""" domain = [["name", "ilike", query]] if query else [] templates = _search_read("task.checklist", domain, ["name", "description", "checklist_ids"], limit=limit) for t in templates: if t.get("checklist_ids"): t["items"] = _search_read( "checklist.item", [["id", "in", t["checklist_ids"]]], ["name", "sequence", "description"], limit=200, ) return templates @mcp.tool() def get_task_checklists(task_id: int) -> list: """Get all checklist item lines (checklist.item.line) attached to a task, with state and item name. State values: 'todo', 'in_progress', 'done', 'cancel'. Task progress % is recomputed automatically from these states in Odoo. Trigger phrases: "task checklist", "checklist items", "checklist status", "acceptance criteria", "phase checklist", "what's checked off".""" return _search_read( "checklist.item.line", [["projects_id", "=", task_id]], ["check_list_item_id", "description", "checklist_id", "state"], limit=200, ) @mcp.tool() def add_checklist_to_task(task_id: int, checklist_id: int) -> int: """Attach a task.checklist template to a task by creating checklist.item.line records. Idempotent — items already added for this checklist+task combination are skipped. Also sets checklist_id on the task to the given template. Returns count of new lines created. Use list_checklist_templates to find valid checklist_id values. NOTE: Replicates the UI onchange since that only fires client-side. Trigger phrases: "add checklist to task", "attach checklist", "apply checklist", "add acceptance criteria", "apply phase checklist".""" # Items already present for this checklist on this task existing = _search_read( "checklist.item.line", [["projects_id", "=", task_id], ["checklist_id", "=", checklist_id]], ["check_list_item_id"], limit=200, ) existing_item_ids = { r["check_list_item_id"][0] for r in existing if isinstance(r.get("check_list_item_id"), list) } # Template items not yet added items = _search_read( "checklist.item", [["checklist_id", "=", checklist_id]], ["id"], limit=200, ) new_items = [it for it in items if it["id"] not in existing_item_ids] if not new_items: return 0 commands = [ (0, 0, {"check_list_item_id": it["id"], "checklist_id": checklist_id, "state": "todo"}) for it in new_items ] _write("project.task", task_id, { "checklist_id": checklist_id, "checklists_ids": commands, }) return len(new_items) @mcp.tool() def update_checklist_item(line_id: int, state: str) -> bool: """Update the state of a checklist.item.line. state: 'todo' | 'in_progress' | 'done' | 'cancel'. Uses Odoo action methods so task progress % and chatter are updated automatically. Use get_task_checklists to find valid line_id values. Trigger phrases: "mark checklist item done", "check off item", "mark in progress", "cancel checklist item", "complete acceptance criteria", "mark phase complete".""" if state == "in_progress": _call("checklist.item.line", "action_approve_and_next", [[line_id]], {}) elif state == "done": _call("checklist.item.line", "action_mark_completed", [[line_id]], {}) elif state == "cancel": _call("checklist.item.line", "action_mark_canceled", [[line_id]], {}) elif state == "todo": _write("checklist.item.line", line_id, {"state": "todo"}) else: raise ValueError( f"Invalid state '{state}'. Valid values: 'todo', 'in_progress', 'done', 'cancel'." ) return True # ════════════════════════════════════════════════════════════════════════════ # HELPDESK # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_helpdesk_tickets(query: str = "", stage: str = "", team: str = "", created_after: str = "", created_before: str = "", limit: int = 20) -> list: """Search helpdesk tickets by name, stage, or team. created_after and created_before accept datetime strings in 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS' format to filter by creation date.""" domain = [] if query: domain.append(["name", "ilike", query]) if stage: domain.append(["stage_id.name", "ilike", stage]) if team: domain.append(["team_id.name", "ilike", team]) if created_after: domain.append(["create_date", ">=", created_after]) if created_before: domain.append(["create_date", "<=", created_before]) return _search_read("helpdesk.ticket", domain, ["id", "name", "partner_id", "stage_id", "team_id", "user_id", "priority", "create_date"], limit=limit, order="create_date desc") @mcp.tool() def get_helpdesk_ticket(ticket_id: int) -> dict: """Get full details of a helpdesk ticket by ID.""" r = _read("helpdesk.ticket", [ticket_id], ["id", "name", "partner_id", "stage_id", "team_id", "user_id", "priority", "description", "create_date", "date_last_stage_update", "kanban_state", "tag_ids"]) return r[0] if r else {} @mcp.tool() def create_helpdesk_ticket(name: str, description: str = "", partner_id: int = None, team_id: int = None) -> int: """Create a helpdesk ticket. Returns new ticket ID.""" vals = {"name": name} if description: vals["description"] = description if partner_id: vals["partner_id"] = partner_id if team_id: vals["team_id"] = team_id return _create("helpdesk.ticket", vals) @mcp.tool() def update_helpdesk_ticket(ticket_id: int, stage_id: int = None, user_id: int = None, note: str = "") -> bool: """Update a helpdesk ticket's stage, assignee, or add a note.""" vals = {} if stage_id: vals["stage_id"] = stage_id if user_id: vals["user_id"] = user_id if note: vals["description"] = note return _write("helpdesk.ticket", ticket_id, vals) if vals else False @mcp.tool() def list_helpdesk_teams() -> list: """List all helpdesk teams.""" return _search_read("helpdesk.team", [], ["id", "name", "description"], limit=20) # ════════════════════════════════════════════════════════════════════════════ # PURCHASE # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_purchase_orders(query: str = "", state: str = "", limit: int = 20) -> list: """Search purchase orders by name or vendor. State: draft, sent, to approve, purchase, done, cancel.""" domain = [] if query: domain.append("|") domain.append(["name", "ilike", query]) domain.append(["partner_id.name", "ilike", query]) if state: domain.append(["state", "=", state]) return _search_read("purchase.order", domain, ["id", "name", "partner_id", "state", "amount_total", "date_order", "user_id", "date_planned"], limit=limit, order="date_order desc") @mcp.tool() def get_purchase_order(order_id: int) -> dict: """Get full details of a purchase order including line items.""" r = _read("purchase.order", [order_id], ["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax", "amount_total", "date_order", "user_id", "notes", "order_line", "date_planned", "payment_term_id"]) if not r: return {} order = r[0] if order.get("order_line"): lines = _read("purchase.order.line", order["order_line"], ["id", "product_id", "name", "product_qty", "price_unit", "price_subtotal", "qty_received", "qty_invoiced", "date_planned"]) order["lines"] = lines return order # ════════════════════════════════════════════════════════════════════════════ # INVENTORY # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_inventory(query: str = "", location: str = "internal", limit: int = 30) -> list: """Search current stock levels by product name. location: 'internal' (default), 'customer', 'supplier', 'transit'.""" domain = [["location_id.usage", "=", location], ["quantity", ">", 0]] if query: domain.append(["product_id.name", "ilike", query]) return _search_read("stock.quant", domain, ["product_id", "location_id", "quantity", "reserved_quantity", "lot_id", "package_id"], limit=limit, order="product_id asc") @mcp.tool() def get_stock_moves(product_id: int, limit: int = 20) -> list: """Get recent stock moves for a product (product.product ID).""" return _search_read("stock.move", [ ["product_id", "=", product_id], ["state", "=", "done"] ], ["id", "name", "product_id", "product_uom_qty", "location_id", "location_dest_id", "date", "picking_id", "origin"], limit=limit, order="date desc") @mcp.tool() def list_internal_locations() -> list: """List all internal warehouse/stock locations.""" return _search_read("stock.location", [["usage", "=", "internal"], ["active", "=", True]], ["id", "name", "complete_name", "location_id"], limit=100, order="complete_name asc") # ════════════════════════════════════════════════════════════════════════════ # EMPLOYEES # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def search_employees(query: str = "", department: str = "", job_title: str = "", limit: int = 30) -> list: """Search employees by name, department, or job title.""" domain = [["active", "=", True]] if query: domain.append(["name", "ilike", query]) if department: domain.append(["department_id.name", "ilike", department]) if job_title: domain.append(["job_title", "ilike", job_title]) return _search_read("hr.employee", domain, ["id", "name", "job_title", "department_id", "work_email", "work_phone", "parent_id", "coach_id"], limit=limit, order="name asc") @mcp.tool() def get_employee(employee_id: int) -> dict: """Get full details of an employee by ID.""" r = _read("hr.employee", [employee_id], ["id", "name", "job_title", "job_id", "department_id", "work_email", "work_phone", "mobile_phone", "parent_id", "coach_id", "address_id", "resource_calendar_id", "tz", "birthday", "marital", "country_id"]) return r[0] if r else {} @mcp.tool() def list_departments() -> list: """List all HR departments.""" return _search_read("hr.department", [], ["id", "name", "parent_id", "manager_id"], limit=50, order="name asc") # ════════════════════════════════════════════════════════════════════════════ # UTILITY — GENERIC # ════════════════════════════════════════════════════════════════════════════ @mcp.tool() def odoo_search(model: str, field: str, query: str, limit: int = 10) -> list: """Generic search on any Odoo model. Searches field 'field' using ilike. Useful for looking up stage IDs, category IDs, etc. Example: odoo_search('project.task.type', 'name', 'In Progress')""" return _search_read(model, [[field, "ilike", query]], limit=limit) @mcp.tool() def odoo_get_record(model: str, record_id: int) -> dict: """Fetch a single record from any Odoo model by ID. Returns all default fields.""" r = _call(model, "read", [[record_id]], {}) return r[0] if r else {} @mcp.tool() def odoo_search_read(model: str, domain: list, fields: list, limit: int = 100, offset: int = 0, order: str = "") -> list: """Search any Odoo model with a full domain filter and return only specified fields. Primary tool for efficient data retrieval — avoids token bloat from full-record responses. Trigger phrases: "search with filters", "get records where", "filter by", "find all X where Y", "search_read". Domain syntax (Odoo prefix notation): Comparison: ['field', '=', val] ['field', '!=', val] ['field', '>', val] ['field', '>=', val] ['field', '<', val] ['field', 'in', [v1,v2]] ['field', 'not in', [v1]] ['field', 'ilike', 'text'] ['field', '!=', False] (field is set) ['field', '=', False] (field is empty) Logical: '|' AND '!' (prefix operators) AND example: [['state', 'in', ['draft','sent']], ['amount_total', '>', 0]] OR example: ['|', ['user_id', '=', 57], ['user_id', '=', 75]] 3-way OR: ['|', ['a','=',1], '|', ['b','=',2], ['c','=',3]] Never pass fields=[] — that returns all fields and defeats the purpose.""" if not fields: raise Exception("[odoo_search_read] fields cannot be empty — specify the fields you need.") kw: dict = {"fields": fields, "limit": limit, "offset": offset} if order: kw["order"] = order try: return _call(model, "search_read", [domain], kw) except Exception as e: raise Exception(f"[odoo_search_read] Failed on model '{model}': {e}") @mcp.tool() def get_record_count(model: str, domain: list) -> dict: """Count records matching a domain without fetching data. Use to check scope before bulk operations or build quick reports. Trigger phrases: "how many", "count of", "number of records".""" try: count = _call(model, "search_count", [domain]) # Build a human-readable summary from the domain parts = [] for clause in domain: if isinstance(clause, (list, tuple)) and len(clause) == 3: parts.append(f"{clause[0]} {clause[1]} {clause[2]}") summary = ", ".join(parts) if parts else "all records" return {"model": model, "domain_summary": summary, "count": count} except Exception as e: raise Exception(f"[get_record_count] Failed on model '{model}': {e}") @mcp.tool() def bulk_update_records(model: str, record_ids: list, values: dict) -> dict: """Update one or more fields on a list of records across any Odoo model in one API call. All records in record_ids receive the same field values. Covers stage changes, custom Studio fields (x_studio_*), status flags, etc. Trigger phrases: "update field", "set value on", "bulk update", "change stage".""" if not record_ids: return {"success": True, "model": model, "updated_ids": [], "count": 0, "values_set": values} try: _write(model, record_ids, values) return { "success": True, "model": model, "updated_ids": record_ids, "count": len(record_ids), "values_set": values } except Exception as e: raise Exception(f"[bulk_update_records] Failed on model '{model}': {e}") @mcp.tool() def archive_records(model: str, record_ids: list) -> dict: """Archive (soft-delete) records by setting active=False. Records are hidden from standard views but not deleted. Works on any model with an active field. For sale.order: cancel first with cancel_sale_orders. For crm.lead: archive directly. Trigger phrases: "archive", "hide record", "soft delete", "deactivate".""" if not record_ids: return {"success": True, "archived_model": model, "archived_ids": [], "count": 0} try: _write(model, record_ids, {"active": False}) return { "success": True, "archived_model": model, "archived_ids": record_ids, "count": len(record_ids) } except Exception as e: raise Exception(f"[archive_records] Failed on model '{model}': {e}") @mcp.tool() def get_stage_ids(model: str, stage_names: list) -> dict: """Resolve stage names to IDs for CRM, Project Task, or Helpdesk stages. Matching is case-insensitive and exact (not partial). model values: 'crm.stage' — CRM opportunity stages 'project.task.type' — Project task stages 'helpdesk.stage' — Helpdesk ticket stages Returns resolved dict (name → id) and unresolved list for any names not found. Trigger phrases: "stage id for", "resolve stage", "what is the id of stage".""" if not stage_names: return {"model": model, "resolved": {}, "unresolved": []} # Build case-insensitive OR domain (interleaved prefix notation) # ['|', cond1, cond2] for 2, ['|', cond1, '|', cond2, cond3] for 3, etc. conditions = [["name", "ilike", name] for name in stage_names] if len(conditions) == 1: domain = conditions else: domain = [] for i, cond in enumerate(conditions): if i < len(conditions) - 1: domain.append("|") domain.append(cond) try: results = _search_read(model, domain, ["id", "name"], limit=200) except Exception as e: raise Exception(f"[get_stage_ids] Failed on model '{model}': {e}") # Post-filter: exact case-insensitive match (ilike is contains, we want exact) resolved = {} for req_name in stage_names: for r in results: if r["name"].strip().lower() == req_name.strip().lower(): resolved[req_name] = r["id"] break unresolved = [n for n in stage_names if n not in resolved] return {"model": model, "resolved": resolved, "unresolved": unresolved} @mcp.tool() def call_odoo_method(model: str, method: str, record_ids: list, kwargs: dict = None) -> object: """Direct pass-through to any Odoo model method. Escape hatch for operations not covered by other tools — workflow transitions, report actions, custom methods. Returns raw Odoo response. Use sparingly and document usage in session notes. Trigger phrases: "call method", "run action", "execute button".""" try: return _call(model, method, [record_ids], kwargs or {}) except Exception as e: raise Exception(f"[call_odoo_method] Failed calling '{model}.{method}': {e}") # ════════════════════════════════════════════════════════════════════════════ # ELEARNING — COURSES & CONTENT # ════════════════════════════════════════════════════════════════════════════ # # Models: # slide.channel — Courses # slide.slide — Slides / content items # slide.question — Quiz questions (linked to slides) # slide.answer — Quiz answer options # slide.channel.partner — Enrollment & completion tracking # slide.channel.resource — Course resource attachments # # Google Drive / image operations: # Always use the Google Workspace MCP connector when it is available in the # session. Only ask the user to provide a URL or upload content manually if # the connector is NOT connected. Never construct Drive URLs manually. # # slide_type values: # 'pdf' — PDF document (url = Drive share link or direct PDF URL) # 'youtube_video' — YouTube video (url = YouTube watch URL) # 'vimeo_video' — Vimeo video (url = Vimeo video URL) # 'infographic' — Image / infographic # 'webpage' — Article with native HTML content (html_content field) # # visibility values: 'public' | 'connected' | 'members' | 'website' # enroll values: 'public' (Open) | 'invite' (On Invitation) # channel_type values: 'training' | 'documentation' # ── Courses ─────────────────────────────────────────────────────────────────── @mcp.tool() def search_courses(query: str = "", channel_type: str = "", visibility: str = "", enroll: str = "", published: bool = None, limit: int = 20) -> list: """Search eLearning courses. channel_type : 'training' or 'documentation' visibility : 'public' (Everyone), 'connected' (Signed In), 'members' (Course Attendees), 'website' (Anyone with link) enroll : 'public' (Open) or 'invite' (On Invitation) published : True = live courses only, False = drafts only, None = all Returns id, name, channel_type, visibility, enroll policy, publish status, slide counts by type, member count, and website URL. Trigger phrases: "list courses", "find course", "search elearning", "training courses", "what courses do we have".""" domain = [] if query: domain.append(["name", "ilike", query]) if channel_type: domain.append(["channel_type", "=", channel_type]) if visibility: domain.append(["visibility", "=", visibility]) if enroll: domain.append(["enroll", "=", enroll]) if published is not None: domain.append(["website_published", "=", published]) return _search_read("slide.channel", domain, ["id", "name", "channel_type", "visibility", "enroll", "website_published", "total_slides", "nbr_document", "nbr_video", "nbr_infographic", "members_count", "website_url", "tag_ids"], limit=limit, order="name asc") @mcp.tool() def get_course(channel_id: int) -> dict: """Get full details of an eLearning course: settings, description, slide list, enrollment count, upload/auto-enroll groups, and cover image presence. Trigger phrases: "get course", "course details", "show course".""" r = _read("slide.channel", [channel_id], ["id", "name", "description", "channel_type", "visibility", "enroll", "website_published", "total_slides", "nbr_document", "nbr_video", "nbr_infographic", "members_count", "website_url", "tag_ids", "slide_ids", "upload_group_ids", "enroll_group_ids"]) if not r: return {} course = r[0] if course.get("description"): course["description"] = re.sub( r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', 'src="[embedded image]"', course["description"] ) return course @mcp.tool() def create_course(name: str, description: str = "", channel_type: str = "training", visibility: str = "public", enroll: str = "public", tag_ids: list = None, cover_image_url: str = "", enroll_group_names: list = None) -> dict: """Create a new eLearning course. channel_type : 'training' (default) or 'documentation' visibility : 'public', 'connected', 'members', 'website' enroll : 'public' (Open) or 'invite' (On Invitation) tag_ids : list of existing tag IDs to assign cover_image_url : image URL — use Google Workspace MCP to get a Drive download URL when available; only ask the user if the connector is absent enroll_group_names: Odoo group names for auto-enrollment e.g. ['Internal Users'] to restrict to employees Returns dict with new course id and name. Trigger phrases: "create course", "new course", "new training", "add elearning course".""" vals = { "name": name, "channel_type": channel_type, "visibility": visibility, "enroll": enroll, } if description: vals["description"] = description if tag_ids: vals["tag_ids"] = [(6, 0, tag_ids)] if enroll_group_names: gids = _resolve_group_ids(enroll_group_names) if gids: vals["enroll_group_ids"] = [(6, 0, gids)] channel_id = _create("slide.channel", vals) if cover_image_url: try: _write("slide.channel", channel_id, {"image_1920": _fetch_image_base64(cover_image_url)}) except Exception as e: return {"id": channel_id, "name": name, "warning": f"Course created but cover image failed: {e}"} return {"id": channel_id, "name": name} @mcp.tool() def update_course(channel_id: int, name: str = "", description: str = "", channel_type: str = "", visibility: str = "", enroll: str = "", tag_ids: list = None, cover_image_url: str = "", enroll_group_names: list = None) -> dict: """Update an existing eLearning course. Only provided fields are changed. channel_type : 'training' or 'documentation' visibility : 'public', 'connected', 'members', 'website' enroll : 'public' or 'invite' tag_ids : replaces current tags with this list cover_image_url : new cover image — use Google Workspace MCP connector when available to get the download URL; do not ask the user to construct or upload unless the connector is absent enroll_group_names: replaces auto-enroll groups; pass [] to clear Trigger phrases: "update course", "edit course", "change course settings".""" vals = {} if name: vals["name"] = name if description: vals["description"] = description if channel_type: vals["channel_type"] = channel_type if visibility: vals["visibility"] = visibility if enroll: vals["enroll"] = enroll if tag_ids is not None: vals["tag_ids"] = [(6, 0, tag_ids)] if enroll_group_names is not None: gids = _resolve_group_ids(enroll_group_names) vals["enroll_group_ids"] = [(6, 0, gids)] warnings = [] if vals: _write("slide.channel", channel_id, vals) if cover_image_url: try: _write("slide.channel", channel_id, {"image_1920": _fetch_image_base64(cover_image_url)}) vals["image_1920"] = "(updated)" except Exception as e: warnings.append(f"Cover image update failed: {e}") result = {"success": True, "channel_id": channel_id, "fields_updated": list(vals.keys())} if warnings: result["warnings"] = warnings return result @mcp.tool() def publish_course(channel_id: int, published: bool = True) -> dict: """Publish or unpublish an eLearning course. published=True → course is live and visible on the website published=False → course is hidden (draft) Trigger phrases: "publish course", "unpublish course", "make course live", "hide course", "take course offline".""" _write("slide.channel", channel_id, {"website_published": published}) return {"success": True, "channel_id": channel_id, "status": "published" if published else "unpublished"} # ── Slides / Content ────────────────────────────────────────────────────────── @mcp.tool() def list_course_slides(channel_id: int) -> list: """List all slides/content items in a course ordered by sequence. Returns id, name, slide_type, published status, completion time, sequence, and URL. Trigger phrases: "list slides", "course content", "course lessons", "show modules".""" return _search_read("slide.slide", [["channel_id", "=", channel_id]], ["id", "name", "slide_type", "is_published", "website_published", "completion_time", "sequence", "url", "tag_ids"], limit=500, order="sequence asc") @mcp.tool() def get_slide(slide_id: int) -> dict: """Get full details of a slide including HTML body (articles), URL (pdf/video), and quiz questions if any are attached. Trigger phrases: "get slide", "slide details", "show lesson", "get article content".""" r = _read("slide.slide", [slide_id], ["id", "name", "channel_id", "slide_type", "is_published", "website_published", "description", "html_content", "url", "completion_time", "sequence", "tag_ids", "question_ids", "likes", "dislikes"]) if not r: return {} slide = r[0] if slide.get("question_ids"): try: questions = _read("slide.question", slide["question_ids"], ["id", "question", "answer_ids", "sequence"]) for q in questions: if q.get("answer_ids"): q["answers"] = _read("slide.answer", q["answer_ids"], ["id", "text_value", "is_correct", "comment"]) slide["questions"] = sorted(questions, key=lambda q: q.get("sequence", 0)) except Exception: slide["questions"] = [] return slide @mcp.tool() def create_slide(channel_id: int, name: str, slide_type: str = "pdf", description: str = "", url: str = "", html_content: str = "", completion_time: float = 0.0, sequence: int = 0, is_published: bool = False) -> dict: """Create a new slide/content item in a course. slide_type options: 'pdf' — PDF document. Set url to a Google Drive share link. Use Google Workspace MCP to get the link — do not ask the user to construct it unless the connector is absent. 'youtube_video' — YouTube video. Set url to the watch URL. 'vimeo_video' — Vimeo video. Set url to the Vimeo URL. 'infographic' — Image. Set url to a publicly accessible image URL. 'webpage' — Article. Set html_content to formatted HTML. Claude can generate article HTML content directly. url : Link to content. For Drive PDFs, use Google Workspace MCP to get the shareable link — never ask the user to build it. html_content : For 'webpage' articles only. Full HTML body. completion_time : Estimated time in hours (0.25 = 15 min, 0.5 = 30 min). sequence : Position in course (lower = earlier). 0 = append at end. is_published : Immediately visible to learners if True. Trigger phrases: "add slide", "add lesson", "add content", "create slide", "add video", "add article", "add PDF", "add quiz".""" vals = { "channel_id": channel_id, "name": name, "slide_type": slide_type, "is_published": is_published, "website_published": is_published, } if description: vals["description"] = description if url: vals["url"] = url if html_content: vals["html_content"] = html_content if completion_time: vals["completion_time"] = completion_time if sequence: vals["sequence"] = sequence slide_id = _create("slide.slide", vals) return {"id": slide_id, "name": name, "slide_type": slide_type, "channel_id": channel_id} @mcp.tool() def update_slide(slide_id: int, name: str = "", description: str = "", url: str = "", html_content: str = "", completion_time: float = None, sequence: int = None) -> dict: """Update a slide's metadata, URL, or article HTML content. Only provided fields are changed. For 'webpage' articles: pass html_content to replace the article body. For pdf/video slides: pass url to update the linked content. When updating a URL for a Drive document, use Google Workspace MCP to get the shareable link — do not ask the user to provide it unless the connector is absent. Trigger phrases: "update slide", "edit lesson", "update article", "update slide content", "change video URL".""" vals = {} if name: vals["name"] = name if description: vals["description"] = description if url: vals["url"] = url if html_content: vals["html_content"] = html_content if completion_time is not None: vals["completion_time"] = completion_time if sequence is not None: vals["sequence"] = sequence if not vals: return {"success": False, "reason": "No fields provided to update"} _write("slide.slide", slide_id, vals) return {"success": True, "slide_id": slide_id, "fields_updated": list(vals.keys())} @mcp.tool() def publish_slide(slide_id: int, published: bool = True) -> dict: """Publish or unpublish an individual slide. published=True → slide is visible to enrolled learners published=False → slide is hidden (draft) Trigger phrases: "publish slide", "unpublish slide", "hide lesson", "make lesson visible", "publish lesson".""" _write("slide.slide", slide_id, { "is_published": published, "website_published": published, }) return {"success": True, "slide_id": slide_id, "status": "published" if published else "unpublished"} @mcp.tool() def reorder_slides(channel_id: int, slide_sequences: dict) -> dict: """Reorder slides in a course by setting sequence numbers. slide_sequences maps slide_id → sequence number (lower = earlier). Example: reorder_slides(8, {"38": 1, "39": 2, "40": 3, "41": 4}) Trigger phrases: "reorder slides", "reorder lessons", "change slide order", "move lesson", "rearrange course".""" if not slide_sequences: return {"success": False, "reason": "No sequences provided"} updated, errors = [], [] for sid, seq in slide_sequences.items(): try: _write("slide.slide", int(sid), {"sequence": int(seq)}) updated.append(int(sid)) except Exception as e: errors.append({"slide_id": sid, "error": str(e)}) return { "success": len(errors) == 0, "channel_id": channel_id, "updated_count": len(updated), "updated_ids": updated, "errors": errors, } # ── Quiz ────────────────────────────────────────────────────────────────────── @mcp.tool() def get_quiz_questions(slide_id: int) -> list: """Get all quiz questions and answers for a slide. Returns each question with its answer options and correct answer flags. Trigger phrases: "quiz questions", "get questions", "show quiz", "what questions".""" slide = _read("slide.slide", [slide_id], ["id", "name", "question_ids"]) if not slide or not slide[0].get("question_ids"): return [] try: questions = _read("slide.question", slide[0]["question_ids"], ["id", "question", "answer_ids", "sequence"]) for q in questions: if q.get("answer_ids"): q["answers"] = _read("slide.answer", q["answer_ids"], ["id", "text_value", "is_correct", "comment"]) return sorted(questions, key=lambda q: q.get("sequence", 0)) except Exception as e: raise Exception(f"[get_quiz_questions] Failed: {e}") @mcp.tool() def add_quiz_question(slide_id: int, question: str, answers: list, sequence: int = 0) -> dict: """Add one quiz question to a slide. answers: list of dicts with: 'text' — answer option text (required) 'is_correct' — True/False; at least one answer must be correct 'comment' — optional feedback shown after answering Example: add_quiz_question( slide_id=38, question="What does RDMC stand for?", answers=[ {"text": "Remote Display Management Console", "is_correct": True, "comment": "Correct — RDMC manages the full installed display base."}, {"text": "Real-time Data Management Center", "is_correct": False}, {"text": "Remote Display Media Controller", "is_correct": False}, ] ) Trigger phrases: "add question", "add quiz question", "add quiz item".""" q_vals = {"slide_id": slide_id, "question": question} if sequence: q_vals["sequence"] = sequence question_id = _create("slide.question", q_vals) answer_ids = [] for ans in answers: a_vals = { "question_id": question_id, "text_value": ans["text"], "is_correct": ans.get("is_correct", False), } if ans.get("comment"): a_vals["comment"] = ans["comment"] answer_ids.append(_create("slide.answer", a_vals)) return { "question_id": question_id, "slide_id": slide_id, "question": question, "answer_ids": answer_ids, "answer_count": len(answer_ids), } @mcp.tool() def generate_quiz(slide_id: int, questions: list) -> dict: """Write a batch of pre-generated quiz questions to a slide. Intended workflow — Claude does the generation, this tool does the writing: 1. Call get_slide or list_course_slides to read the lesson content 2. Generate question/answer sets based on that content 3. Call this tool to write all questions in one operation questions: list of dicts with: 'question' — question text (required) 'answers' — list of answer dicts (same format as add_quiz_question) 'sequence' — optional ordering (auto-increments from 1 if omitted) Replaces any existing questions on the slide before writing. Trigger phrases: "generate quiz", "create quiz from content", "auto-generate questions", "build quiz".""" # Clear existing questions existing = _read("slide.slide", [slide_id], ["question_ids"]) if existing and existing[0].get("question_ids"): try: _call("slide.question", "unlink", [existing[0]["question_ids"]]) except Exception: pass # non-fatal created = [] for i, q_data in enumerate(questions): result = add_quiz_question( slide_id=slide_id, question=q_data["question"], answers=q_data["answers"], sequence=q_data.get("sequence", i + 1), ) created.append(result) return { "success": True, "slide_id": slide_id, "questions_created": len(created), "question_ids": [c["question_id"] for c in created], } # ── Enrollment ──────────────────────────────────────────────────────────────── @mcp.tool() def get_course_enrollment(channel_id: int) -> list: """Get all enrollment records for a course — enrolled users, completion percentage, and whether they have fully completed it. Trigger phrases: "who's enrolled", "enrollment", "completion rate", "course progress", "learner progress".""" return _search_read("slide.channel.partner", [["channel_id", "=", channel_id]], ["partner_id", "completion", "completed", "last_seen_slide_id"], limit=500, order="completion desc") @mcp.tool() def enroll_in_course(channel_id: int, partner_ids: list = None, emails: list = None, send_invitation: bool = False, enroll_group_names: list = None) -> dict: """Enroll one or more people in an eLearning course. partner_ids : list of Odoo partner IDs to enroll directly emails : list of email addresses — resolved to partner IDs automatically send_invitation : True to send invitation email after enrolling enroll_group_names: add Odoo group(s) to the course auto-enroll list (e.g. ['Internal Users'] for all employees) This tool is the standard onboarding hook — the project management plugin or any other skill can call it to enroll new customers in courses automatically. Trigger phrases: "enroll in course", "add to course", "give course access", "onboard to course", "invite to training".""" all_partner_ids = list(partner_ids or []) not_found_emails = [] if emails: found, not_found = _resolve_partner_ids(emails) all_partner_ids.extend(found) not_found_emails = not_found # Check for existing enrollments to avoid duplicates existing = _search_read("slide.channel.partner", [["channel_id", "=", channel_id], ["partner_id", "in", all_partner_ids]], ["partner_id"], limit=1000) if all_partner_ids else [] already_enrolled = set() for r in existing: pid = r["partner_id"] already_enrolled.add(pid[0] if isinstance(pid, (list, tuple)) else pid) new_ids = [pid for pid in all_partner_ids if pid not in already_enrolled] # Create enrollment records created = [] for pid in new_ids: try: _create("slide.channel.partner", { "channel_id": channel_id, "partner_id": pid, }) created.append(pid) except Exception: pass # may already exist due to race condition # Add auto-enroll groups if requested if enroll_group_names: gids = _resolve_group_ids(enroll_group_names) if gids: _write("slide.channel", channel_id, {"enroll_group_ids": [(4, gid) for gid in gids]}) # Send invitations invitation_result = None if send_invitation and created: for method_name in ("action_send_share_mail", "_send_share_email"): try: _call("slide.channel", method_name, [[channel_id]], {"partner_ids": created}) invitation_result = {"sent": True, "partner_ids": created} break except Exception as e: invitation_result = {"sent": False, "error": str(e)} result = { "channel_id": channel_id, "newly_enrolled": created, "already_enrolled": list(already_enrolled), "not_found_emails": not_found_emails, } if invitation_result: result["invitations"] = invitation_result return result @mcp.tool() def bulk_enroll(enrollments: list) -> dict: """Enroll multiple people across one or more courses in a single call. Designed for CSV-style onboarding imports and automated PM → eLearning workflows. enrollments: list of dicts, each with: 'email' or 'partner_id' — who to enroll (required) 'channel_ids' — list of course IDs (required) 'send_invitation' — optional bool, default False Example: bulk_enroll([ {"email": "ops@transitco.gov", "channel_ids": [8, 14], "send_invitation": True}, {"email": "admin@city.gov", "channel_ids": [8], "send_invitation": True}, {"partner_id": 4271, "channel_ids": [15]}, ]) Returns a per-record summary with enrolled / already_enrolled / not_found flags. Trigger phrases: "bulk enroll", "import enrollments", "enroll list", "onboard customers", "add multiple people to course".""" results = [] for entry in enrollments: email = entry.get("email", "") pid = entry.get("partner_id") channel_ids = entry.get("channel_ids", []) send_inv = entry.get("send_invitation", False) if not channel_ids: results.append({"entry": entry, "error": "No channel_ids provided"}) continue for cid in channel_ids: try: r = enroll_in_course( channel_id=cid, partner_ids=[pid] if pid else None, emails=[email] if email else None, send_invitation=send_inv, ) results.append({ "identity": email or str(pid), "channel_id": cid, "enrolled": len(r.get("newly_enrolled", [])) > 0, "already_enrolled": len(r.get("already_enrolled", [])) > 0, "not_found": len(r.get("not_found_emails", [])) > 0, }) except Exception as e: results.append({ "identity": email or str(pid), "channel_id": cid, "error": str(e), }) succeeded = sum(1 for r in results if r.get("enrolled") or r.get("already_enrolled")) return { "total": len(results), "succeeded": succeeded, "failed": len(results) - succeeded, "results": results, } @mcp.tool() def send_course_invitation(channel_id: int, partner_ids: list) -> dict: """Send or resend course invitation emails to specific enrolled partners. Trigger phrases: "send invitation", "resend invite", "email course invite", "resend course email".""" for method_name in ("action_send_share_mail", "_send_share_email"): try: _call("slide.channel", method_name, [[channel_id]], {"partner_ids": partner_ids}) return {"success": True, "channel_id": channel_id, "invited_partner_ids": partner_ids, "count": len(partner_ids)} except Exception as e: last_error = e raise Exception(f"[send_course_invitation] All invitation methods failed: {last_error}") # ── Media & Resources ───────────────────────────────────────────────────────── @mcp.tool() def set_course_cover(channel_id: int, image_url: str) -> dict: """Set or replace the cover image for an eLearning course. image_url: A direct download URL for the image. Use Google Workspace MCP to get a download URL from Google Drive when the connector is available — do not ask the user to provide or construct the URL unless the connector is absent. The image is fetched, base64-encoded, and written to Odoo. Trigger phrases: "set course cover", "update cover image", "course thumbnail", "change course image".""" try: _write("slide.channel", channel_id, {"image_1920": _fetch_image_base64(image_url)}) return {"success": True, "channel_id": channel_id} except Exception as e: raise Exception(f"[set_course_cover] {e}") @mcp.tool() def set_slide_cover(slide_id: int, image_url: str) -> dict: """Set or replace the thumbnail/cover image for an individual slide. image_url: A direct download URL for the image. Use Google Workspace MCP for Drive images when available. Do not ask the user to provide the URL unless the connector is absent. Trigger phrases: "slide thumbnail", "slide cover image", "lesson image", "set slide image".""" try: _write("slide.slide", slide_id, {"image_1920": _fetch_image_base64(image_url)}) return {"success": True, "slide_id": slide_id} except Exception as e: raise Exception(f"[set_slide_cover] {e}") @mcp.tool() def link_drive_document(slide_id: int, drive_url: str) -> dict: """Set the URL for a document, video, or infographic slide. Use the Google Workspace MCP connector to get the shareable link for a Drive file when available. Do not ask the user to construct or copy the URL unless the connector is not connected. drive_url: Share link or direct viewer URL for the content. For PDFs: use the Drive file share link. For videos: use the YouTube or Vimeo watch URL. Trigger phrases: "link document", "set PDF URL", "attach Google Drive", "link Drive file", "update video URL", "set slide URL".""" _write("slide.slide", slide_id, {"url": drive_url}) return {"success": True, "slide_id": slide_id, "url": drive_url} @mcp.tool() def add_course_resource(channel_id: int, name: str, resource_url: str) -> dict: """Add a downloadable resource or external link to a course's resource section. Resources appear as attachments/links on the course landing page. resource_url: URL of the resource. Use Google Workspace MCP to get Drive share links when available. Do not ask the user to provide the URL unless the connector is absent. Trigger phrases: "add resource", "attach file to course", "course download", "add course attachment", "add course link".""" try: resource_id = _create("slide.channel.resource", { "channel_id": channel_id, "name": name, "resource_type": "url", "link": resource_url, }) return {"success": True, "resource_id": resource_id, "channel_id": channel_id, "name": name, "url": resource_url} except Exception as e: # Fallback: add as a published slide if resource model unavailable try: sid = _create("slide.slide", { "channel_id": channel_id, "name": name, "slide_type": "pdf", "url": resource_url, "is_published": True, "website_published": True, }) return {"success": True, "fallback": "created_as_slide", "slide_id": sid, "channel_id": channel_id, "name": name, "url": resource_url} except Exception as e2: raise Exception( f"[add_course_resource] Failed: primary={e}, fallback={e2}" ) if __name__ == "__main__": mcp.run()