Infer field specification and mapping from model definition

This commit is contained in:
Hubert Van De Walle
2025-03-05 13:57:58 +01:00
parent 4b0622451b
commit 47a3eed98b
6 changed files with 87 additions and 79 deletions
+31
View File
@@ -0,0 +1,31 @@
import random
vfyd_tags = [
"vfyd accounting",
"vfyd discuss",
"vfyd easy",
"vfyd editor",
"vfyd js",
"vfyd marketing",
"vfyd pos",
"vfyd sale",
"vfyd security",
"vfyd spreadsheet",
"vfyd stock",
"vfyd studio",
"vfyd vidange",
"vfyd website",
"vfyd industry",
]
team = [
"nle",
"abz",
"hael",
"roen",
"sbel",
"wasa",
"huvw",
]
random.shuffle(team)
+104
View File
@@ -0,0 +1,104 @@
import click
import random
import odoo_client
import json
from rich import print, console
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional
from config import vfyd_tags, team
from tasks import Task
console = console.Console()
def load_dispatch(date: Optional[str] = None) -> Dict[str, List[int]]:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
with open(f"out/{date}_dispatch.json", "r") as f:
return json.load(f)
def save_dispatch(dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
with open(f"out/{date}_dispatch.json", "w") as f:
json.dump(dispatch, f)
@click.group()
def cli() -> None:
pass
@cli.command()
def dispatch():
"""Randomly distribute unassigned tasks among team members."""
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
client = odoo_client.OdooClient()
records = client.web_search_read(Task, domain)
random.shuffle(records)
dispatch = defaultdict(list)
out = defaultdict(list)
for idx, task in enumerate(records):
dispatch[team[idx % len(team)]].append(task.id)
out[team[idx % len(team)]].append(task)
save_dispatch(dispatch)
for key, tasks in out.items():
print(f"**{key}:**")
for task in tasks:
print(task.url)
@cli.command()
def status():
"""Show all unassigned tasks in the backlog."""
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = client.web_search_read(Task, domain)
for task in tasks:
print(task)
@cli.command()
def check():
"""Check which previously dispatched tasks are still not completed."""
dispatch = load_dispatch()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
client = odoo_client.OdooClient()
domain = [
("id", "in", list(reverse_dispatch.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = client.web_search_read(Task, domain)
not_done = defaultdict(list)
for task in tasks:
not_done[reverse_dispatch[task.id]].append(task)
for key, value in not_done.items():
if value:
console.rule(f"[red]{key}[/red]")
for task in value:
print(task)
if __name__ == "__main__":
cli()
+51
View File
@@ -0,0 +1,51 @@
from typing import Dict, Self, Any, get_origin
from dataclasses import fields, Field
class OdooModel:
@classmethod
def specification(cls) -> Dict[str, Any]:
def field_spec(f: Field) -> Dict[str, Any]:
field_type = f.type
if f.name.endswith("_id") or f.name.endswith("_ids"):
return {
f.name: {
"fields": {
"display_name": {},
},
},
}
elif field_type in {str, int, bool}:
return {f.name: {}}
else:
raise NotImplementedError(f"Unsupported field type: {field_type}")
spec = dict()
for f in fields(cls):
if f.init:
spec.update(field_spec(f))
return spec
@classmethod
def from_record(cls, record: Dict) -> Self:
init_args = {}
for f in fields(cls):
if f.init and f.name in record:
value = record[f.name]
if f.name.endswith("_id"):
if f.type is str:
value = value["display_name"]
else:
raise NotImplementedError()
if f.name.endswith("_ids"):
if get_origin(f.type) is list and f.type.__args__[0] is str:
value = [v["display_name"] for v in value]
else:
raise NotImplementedError()
init_args[f.name] = value
return cls(**init_args)
+56
View File
@@ -0,0 +1,56 @@
import os
import requests
import random
from typing import Any, Dict, List, TypeVar, Type
from model import OdooModel
T = TypeVar("T", bound=OdooModel)
class OdooClient:
def __init__(self):
self.base_url = "https://www.odoo.com"
self.session = requests.Session()
if session_id := os.getenv("ODOO_SESSION_ID"):
self.session.cookies.set("session_id", session_id)
else:
raise Exception("ODOO_SESSION_ID is not set")
self.session.cookies.set("cids", "1")
self.session.cookies.set("frontend_lang", "en_US")
def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response:
body: Dict[str, Any] = {
"jsonrpc": "2.0",
"method": method,
"id": random.randint(1, 1000000),
"params": {
"model": model,
"method": method,
"args": args,
"kwargs": kwargs,
},
}
return self.session.post(
f"{self.base_url}/web/dataset/call_kw",
json=body,
)
def web_search_read(
self,
model: Type[T],
domain: List[tuple],
*args: Any,
**kwargs: Any,
) -> List[T]:
args = [domain, model.specification(), *args]
res = self.rpc(model._name, "web_search_read", *args, **kwargs)
res.raise_for_status()
json = res.json()
if json.get("error"):
raise Exception(json.get("error"))
records = json.get("result", {}).get("records", [])
return [model.from_record(record) for record in records]
+18
View File
@@ -0,0 +1,18 @@
from dataclasses import dataclass, field
from typing import List
from model import OdooModel
@dataclass
class Task(OdooModel):
_name = "project.task"
id: int
name: str
tag_ids: List[str]
stage_id: str
priority: bool
url: str = field(init=False)
def __post_init__(self) -> None:
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"