tri/src/odoo_client.py
2025-04-28 08:56:51 +02:00

58 lines
1.6 KiB
Python

import random
from typing import Any, TypeVar
import requests
from config import config
from model import OdooModel
T = TypeVar("T", bound=OdooModel)
class OdooClient:
def __init__(self):
self.base_url = "https://www.odoo.com"
self.session = requests.Session()
self.session.cookies.set("session_id", config.session_id)
self.session.cookies.set("cids", "1")
self.session.cookies.set("frontend_lang", "en_US")
def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response:
body: dict[str, Any] = {
"jsonrpc": "2.0",
"method": method,
"id": random.randint(1, 1_000_000),
"params": {
"model": model,
"method": method,
"args": args,
"kwargs": kwargs,
},
}
response = self.session.post(
f"{self.base_url}/web/dataset/call_kw",
json=body,
)
response.raise_for_status()
return response
def web_search_read(
self,
model: type[T],
domain: list[tuple[Any, ...]],
*args: Any,
**kwargs: Any,
) -> list[T]:
args = [domain, model.specification(), *args]
response = self.rpc(model._name, "web_search_read", *args, **kwargs)
json_data = response.json()
if error := json_data.get("error"):
raise RuntimeError(f"Odoo RPC Error: {error}")
records = json_data.get("result", {}).get("records", [])
return [model.from_record(record) for record in records]