feat: v0.2.0 — add 10 new tools, cancel_sale_orders draft/sent guard

This commit is contained in:
2026-05-28 12:40:27 -05:00
parent ac58bdc8d0
commit c14f20577c
+235 -13
View File
@@ -35,7 +35,7 @@ class ProxyAwareTransport(xmlrpc.client.SafeTransport):
opener = urllib.request.build_opener(urllib.request.ProxyHandler()) opener = urllib.request.build_opener(urllib.request.ProxyHandler())
try: try:
with opener.open(req, timeout=30) as resp: with opener.open(req, timeout=30) as resp:
self.verbose = verbose # required by Transport.parse_response self.verbose = verbose
return self.parse_response(resp) return self.parse_response(resp)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers)) raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers))
@@ -47,11 +47,9 @@ _proxy_transport = ProxyAwareTransport()
# ── Odoo client ─────────────────────────────────────────────────────────────── # ── Odoo client ───────────────────────────────────────────────────────────────
_uid: Optional[int] = None _uid: Optional[int] = None
_models = None _models = None
_resolved_api_key: Optional[str] = None # resolved at connect time from env or Keychain _resolved_api_key: Optional[str] = None
def _keychain_get(account: str) -> str: def _keychain_get(account: str) -> str:
"""Read a credential from the system keystore (keyring).
Works on macOS (Keychain), Windows (Credential Manager), and Linux (Secret Service)."""
try: try:
import keyring import keyring
value = keyring.get_password(KEYCHAIN_SERVICE, account) value = keyring.get_password(KEYCHAIN_SERVICE, account)
@@ -60,12 +58,10 @@ def _keychain_get(account: str) -> str:
return "" return ""
def _keychain_set(account: str, value: str) -> None: def _keychain_set(account: str, value: str) -> None:
"""Store a credential in the system keystore (keyring)."""
import keyring import keyring
keyring.set_password(KEYCHAIN_SERVICE, account, value) keyring.set_password(KEYCHAIN_SERVICE, account, value)
def _keychain_delete(account: str) -> bool: def _keychain_delete(account: str) -> bool:
"""Delete a credential from the system keystore. Returns True if it existed."""
try: try:
import keyring import keyring
existing = keyring.get_password(KEYCHAIN_SERVICE, account) existing = keyring.get_password(KEYCHAIN_SERVICE, account)
@@ -77,7 +73,6 @@ def _keychain_delete(account: str) -> bool:
return False return False
def _get_credentials() -> tuple[str, str]: def _get_credentials() -> tuple[str, str]:
"""Resolve username and API key: env vars take priority, then macOS Keychain."""
username = ODOO_USERNAME or _keychain_get("odoo_username") username = ODOO_USERNAME or _keychain_get("odoo_username")
api_key = ODOO_API_KEY or _keychain_get("odoo_api_key") api_key = ODOO_API_KEY or _keychain_get("odoo_api_key")
return username, api_key return username, api_key
@@ -155,12 +150,10 @@ def setup_odoo_credentials(username: str, api_key: str) -> str:
and are never written to any file on disk.""" and are never written to any file on disk."""
_keychain_set("odoo_username", username.strip()) _keychain_set("odoo_username", username.strip())
_keychain_set("odoo_api_key", api_key.strip()) _keychain_set("odoo_api_key", api_key.strip())
# Reset any cached connection so next call re-authenticates with new credentials
global _uid, _models, _resolved_api_key global _uid, _models, _resolved_api_key
_uid = None _uid = None
_models = None _models = None
_resolved_api_key = None _resolved_api_key = None
# Verify immediately
try: try:
_connect() _connect()
return ( return (
@@ -223,7 +216,6 @@ def get_product(product_id: int) -> dict:
def get_product_stock(product_id: int) -> list: def get_product_stock(product_id: int) -> list:
"""Get current stock quantities for a product (by product.template ID) """Get current stock quantities for a product (by product.template ID)
across all internal locations.""" across all internal locations."""
# Get all product.product IDs under this template
variants = _search_read("product.product", variants = _search_read("product.product",
[["product_tmpl_id", "=", product_id]], [["product_tmpl_id", "=", product_id]],
["id", "display_name"]) ["id", "display_name"])
@@ -447,6 +439,71 @@ def create_sales_order(partner_id: int, lines: list) -> int:
})) }))
return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines}) return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines})
@mcp.tool()
def cancel_sale_orders(order_ids: list) -> dict:
"""Cancel one or more quotations. ONLY operates on orders in Quotation (draft) or
Quotation Sent (sent) state — will raise an error if any provided IDs are in any
other state (Sale Order, Locked, Cancelled, etc.). This tool intentionally cannot
cancel confirmed sale orders.
Trigger phrases: "cancel quotation", "cancel quote"."""
if not order_ids:
return {"success": True, "cancelled_ids": [], "count": 0}
# Pre-validate: only draft/sent allowed — refuse everything else
bad_orders = _search_read(
"sale.order",
[["id", "in", order_ids], ["state", "not in", ["draft", "sent"]]],
["id", "name", "state"]
)
if bad_orders:
bad_desc = [f"{o['name']} (id={o['id']}, state={o['state']})" for o in bad_orders]
raise Exception(
f"[cancel_sale_orders] Refused — only Quotation (draft) or Quotation Sent (sent) "
f"orders may be cancelled via this tool. The following IDs are in other states: "
f"{', '.join(bad_desc)}"
)
try:
_call("sale.order", "action_cancel", [order_ids])
return {"success": True, "cancelled_ids": order_ids, "count": len(order_ids)}
except Exception as e:
raise Exception(f"[cancel_sale_orders] Failed: {e}")
@mcp.tool()
def cancel_and_archive_quotations(order_ids: list) -> dict:
"""Cancel then immediately archive sale order quotations in one call.
All IDs must be in draft or sent state. Cancel is validated before archive is attempted —
if cancel fails, archive is not executed.
Trigger phrases: "cancel and archive", "remove quotation", "clean up quotes"."""
if not order_ids:
return {"success": True, "order_ids": [], "count": 0, "steps_completed": []}
# Pre-validate: check for any orders NOT in draft/sent
bad_orders = _search_read(
"sale.order",
[["id", "in", order_ids], ["state", "not in", ["draft", "sent"]]],
["id", "name", "state"]
)
if bad_orders:
bad_desc = [f"{o['name']} (id={o['id']}, state={o['state']})" for o in bad_orders]
raise Exception(
f"[cancel_and_archive_quotations] Cannot proceed — orders not in draft/sent state: "
f"{', '.join(bad_desc)}"
)
try:
_call("sale.order", "action_cancel", [order_ids])
except Exception as e:
raise Exception(f"[cancel_and_archive_quotations] Cancel step failed: {e}")
try:
_write("sale.order", order_ids, {"active": False})
except Exception as e:
raise Exception(
f"[cancel_and_archive_quotations] Archive step failed (orders were cancelled): {e}"
)
return {
"success": True,
"order_ids": order_ids,
"count": len(order_ids),
"steps_completed": ["cancelled", "archived"]
}
# ════════════════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════════════════
# CRM # CRM
@@ -506,6 +563,35 @@ def list_crm_stages() -> list:
return _search_read("crm.stage", [], ["id", "name", "sequence"], return _search_read("crm.stage", [], ["id", "name", "sequence"],
limit=50, order="sequence asc") limit=50, order="sequence asc")
@mcp.tool()
def mark_crm_lead_lost(lead_id: int, lost_reason_id: int = None) -> dict:
"""Mark a CRM lead/opportunity as Lost. Sets probability=0, deactivates the record,
and records the loss reason. Use list_crm_lost_reasons() to get valid reason IDs.
Trigger phrases: "mark lost", "set as lost", "lose the opportunity"."""
vals: dict = {"active": False, "probability": 0}
if lost_reason_id is not None:
vals["lost_reason_id"] = lost_reason_id
try:
_write("crm.lead", [lead_id], vals)
return {
"success": True,
"lead_id": lead_id,
"lost_reason_id": lost_reason_id,
"action": "marked_lost"
}
except Exception as e:
raise Exception(f"[mark_crm_lead_lost] Failed for lead {lead_id}: {e}")
@mcp.tool()
def list_crm_lost_reasons() -> list:
"""List all available CRM lost reasons with IDs and names.
Call this before mark_crm_lead_lost so the user can pick a reason by name.
Trigger phrases: "lost reasons", "why lost", "reason for losing"."""
try:
return _search_read("crm.lost.reason", [], ["id", "name"], limit=100, order="name asc")
except Exception as e:
raise Exception(f"[list_crm_lost_reasons] Failed: {e}")
# ════════════════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════════════════
# PROJECT # PROJECT
@@ -594,8 +680,7 @@ def search_helpdesk_tickets(query: str = "", stage: str = "", team: str = "",
limit: int = 20) -> list: limit: int = 20) -> list:
"""Search helpdesk tickets by name, stage, or team. """Search helpdesk tickets by name, stage, or team.
created_after and created_before accept datetime strings in 'YYYY-MM-DD' or created_after and created_before accept datetime strings in 'YYYY-MM-DD' or
'YYYY-MM-DD HH:MM:SS' format to filter by creation date. 'YYYY-MM-DD HH:MM:SS' format to filter by creation date."""
Example: created_after='2026-03-29' returns tickets created in the last 24 hours."""
domain = [] domain = []
if query: if query:
domain.append(["name", "ilike", query]) domain.append(["name", "ilike", query])
@@ -759,7 +844,7 @@ def list_departments() -> list:
# ════════════════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════════════════
# UTILITY # UTILITY — GENERIC
# ════════════════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════════════════
@mcp.tool() @mcp.tool()
@@ -775,6 +860,143 @@ def odoo_get_record(model: str, record_id: int) -> dict:
r = _call(model, "read", [[record_id]], {}) r = _call(model, "read", [[record_id]], {})
return r[0] if r else {} return r[0] if r else {}
@mcp.tool()
def odoo_search_read(model: str, domain: list, fields: list,
limit: int = 100, offset: int = 0, order: str = "") -> list:
"""Search any Odoo model with a full domain filter and return only specified fields.
Primary tool for efficient data retrieval — avoids token bloat from full-record responses.
Trigger phrases: "search with filters", "get records where", "filter by",
"find all X where Y", "search_read".
Domain syntax (Odoo prefix notation):
Comparison: ['field', '=', val] ['field', '!=', val] ['field', '>', val]
['field', '>=', val] ['field', '<', val] ['field', 'in', [v1,v2]]
['field', 'not in', [v1]] ['field', 'ilike', 'text']
['field', '!=', False] (field is set)
['field', '=', False] (field is empty)
Logical: '|' AND '!' (prefix operators)
AND example: [['state', 'in', ['draft','sent']], ['amount_total', '>', 0]]
OR example: ['|', ['user_id', '=', 57], ['user_id', '=', 75]]
3-way OR: ['|', ['a','=',1], '|', ['b','=',2], ['c','=',3]]
Never pass fields=[] — that returns all fields and defeats the purpose."""
if not fields:
raise Exception("[odoo_search_read] fields cannot be empty — specify the fields you need.")
kw: dict = {"fields": fields, "limit": limit, "offset": offset}
if order:
kw["order"] = order
try:
return _call(model, "search_read", [domain], kw)
except Exception as e:
raise Exception(f"[odoo_search_read] Failed on model '{model}': {e}")
@mcp.tool()
def get_record_count(model: str, domain: list) -> dict:
"""Count records matching a domain without fetching data.
Use to check scope before bulk operations or build quick reports.
Trigger phrases: "how many", "count of", "number of records"."""
try:
count = _call(model, "search_count", [domain])
# Build a human-readable summary from the domain
parts = []
for clause in domain:
if isinstance(clause, (list, tuple)) and len(clause) == 3:
parts.append(f"{clause[0]} {clause[1]} {clause[2]}")
summary = ", ".join(parts) if parts else "all records"
return {"model": model, "domain_summary": summary, "count": count}
except Exception as e:
raise Exception(f"[get_record_count] Failed on model '{model}': {e}")
@mcp.tool()
def bulk_update_records(model: str, record_ids: list, values: dict) -> dict:
"""Update one or more fields on a list of records across any Odoo model in one API call.
All records in record_ids receive the same field values.
Covers stage changes, custom Studio fields (x_studio_*), status flags, etc.
Trigger phrases: "update field", "set value on", "bulk update", "change stage"."""
if not record_ids:
return {"success": True, "model": model, "updated_ids": [], "count": 0, "values_set": values}
try:
_write(model, record_ids, values)
return {
"success": True,
"model": model,
"updated_ids": record_ids,
"count": len(record_ids),
"values_set": values
}
except Exception as e:
raise Exception(f"[bulk_update_records] Failed on model '{model}': {e}")
@mcp.tool()
def archive_records(model: str, record_ids: list) -> dict:
"""Archive (soft-delete) records by setting active=False. Records are hidden from
standard views but not deleted. Works on any model with an active field.
For sale.order: cancel first with cancel_sale_orders. For crm.lead: archive directly.
Trigger phrases: "archive", "hide record", "soft delete", "deactivate"."""
if not record_ids:
return {"success": True, "archived_model": model, "archived_ids": [], "count": 0}
try:
_write(model, record_ids, {"active": False})
return {
"success": True,
"archived_model": model,
"archived_ids": record_ids,
"count": len(record_ids)
}
except Exception as e:
raise Exception(f"[archive_records] Failed on model '{model}': {e}")
@mcp.tool()
def get_stage_ids(model: str, stage_names: list) -> dict:
"""Resolve stage names to IDs for CRM, Project Task, or Helpdesk stages.
Matching is case-insensitive and exact (not partial).
model values:
'crm.stage' — CRM opportunity stages
'project.task.type' — Project task stages
'helpdesk.stage' — Helpdesk ticket stages
Returns resolved dict (name → id) and unresolved list for any names not found.
Trigger phrases: "stage id for", "resolve stage", "what is the id of stage"."""
if not stage_names:
return {"model": model, "resolved": {}, "unresolved": []}
# Build case-insensitive OR domain (interleaved prefix notation)
# ['|', cond1, cond2] for 2, ['|', cond1, '|', cond2, cond3] for 3, etc.
conditions = [["name", "ilike", name] for name in stage_names]
if len(conditions) == 1:
domain = conditions
else:
domain = []
for i, cond in enumerate(conditions):
if i < len(conditions) - 1:
domain.append("|")
domain.append(cond)
try:
results = _search_read(model, domain, ["id", "name"], limit=200)
except Exception as e:
raise Exception(f"[get_stage_ids] Failed on model '{model}': {e}")
# Post-filter: exact case-insensitive match (ilike is contains, we want exact)
resolved = {}
for req_name in stage_names:
for r in results:
if r["name"].strip().lower() == req_name.strip().lower():
resolved[req_name] = r["id"]
break
unresolved = [n for n in stage_names if n not in resolved]
return {"model": model, "resolved": resolved, "unresolved": unresolved}
@mcp.tool()
def call_odoo_method(model: str, method: str, record_ids: list,
kwargs: dict = None) -> object:
"""Direct pass-through to any Odoo model method. Escape hatch for operations
not covered by other tools — workflow transitions, report actions, custom methods.
Returns raw Odoo response. Use sparingly and document usage in session notes.
Trigger phrases: "call method", "run action", "execute button"."""
try:
return _call(model, method, [record_ids], kwargs or {})
except Exception as e:
raise Exception(f"[call_odoo_method] Failed calling '{model}.{method}': {e}")
if __name__ == "__main__": if __name__ == "__main__":
mcp.run() mcp.run()