Compare commits

...

5 Commits

Author SHA1 Message Date
sena 3369b92b11 des trucs 2025-04-28 08:56:51 +02:00
Hubert Van De Walle 2bcb5eef20 Add voip tag 2025-03-18 10:30:39 +01:00
Hubert Van De Walle 54ea243c8c wip 2025-03-07 10:37:04 +01:00
Hubert Van De Walle d3abdb10c4 check availabilities before dispatching 2025-03-05 15:10:30 +01:00
Hubert Van De Walle 47a3eed98b Infer field specification and mapping from model definition 2025-03-05 15:09:50 +01:00
15 changed files with 511 additions and 253 deletions
+4 -1
View File
@@ -1,3 +1,6 @@
__pycache__/
out/
.env
.venv
.vscode/
config.toml
out/
+11
View File
@@ -0,0 +1,11 @@
[odoo]
session_id = "your-odoo-session-id-here"
[team]
members = [
"gram_1",
"gram_2",
"gram_3",
"gram_4",
"gram_5"
]
-31
View File
@@ -1,31 +0,0 @@
import random
vfyd_tags = [
"vfyd accounting",
"vfyd discuss",
"vfyd easy",
"vfyd editor",
"vfyd js",
"vfyd marketing",
"vfyd pos",
"vfyd sale",
"vfyd security",
"vfyd spreadsheet",
"vfyd stock",
"vfyd studio",
"vfyd vidange",
"vfyd website",
"vfyd industry",
]
team = [
"nle",
"abz",
"hael",
#"roen",
"sbel",
"wasa",
"huvw",
]
random.shuffle(team)
-109
View File
@@ -1,109 +0,0 @@
import click
import random
import odoo_client
import json
from tasks import fetch_tasks
from rich import print, console
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional
from config import vfyd_tags, team
console = console.Console()
def load_dispatch(date: Optional[str] = None) -> Dict[str, List[int]]:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
with open(f"out/{date}_dispatch.json", "r") as f:
return json.load(f)
def save_dispatch(dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
with open(f"out/{date}_dispatch.json", "w") as f:
json.dump(dispatch, f)
@click.group()
def cli() -> None:
pass
@cli.command()
def dispatch():
"""Randomly distribute unassigned tasks among team members."""
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
client = odoo_client.OdooClient()
records = fetch_tasks(client, domain)
random.shuffle(records)
dispatch = defaultdict(list)
out = defaultdict(list)
for idx, task in enumerate(records):
dispatch[team[idx % len(team)]].append(task.id)
out[team[idx % len(team)]].append(task)
save_dispatch(dispatch)
for key, tasks in out.items():
print(f"**{key}:**")
for task in tasks:
print(task.url)
@cli.command()
def status():
"""Show all unassigned tasks in the backlog."""
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = fetch_tasks(client, domain)
for task in tasks:
print(task)
@cli.command()
def check():
"""Check which previously dispatched tasks are still not completed."""
dispatch = load_dispatch()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
client = odoo_client.OdooClient()
domain = [
("id", "in", list(reverse_dispatch.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = fetch_tasks(client, domain)
not_done = defaultdict(list)
for task in tasks:
not_done[reverse_dispatch[task.id]].append(task)
for key, value in not_done.items():
if value:
console.rule(f"[red]{key}[/red]")
for task in value:
print(task)
if __name__ == "__main__":
cli()
-52
View File
@@ -1,52 +0,0 @@
import os
import requests
import random
from typing import Any, Dict, List
class OdooClient:
def __init__(self):
self.base_url = "https://www.odoo.com"
self.session = requests.Session()
if session_id := os.getenv("ODOO_SESSION_ID"):
self.session.cookies.set("session_id", session_id)
else:
raise Exception("ODOO_SESSION_ID is not set")
self.session.cookies.set("cids", "1")
self.session.cookies.set("frontend_lang", "en_US")
def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response:
body: Dict[str, Any] = {
"jsonrpc": "2.0",
"method": method,
"id": random.randint(1, 1000000),
"params": {
"model": model,
"method": method,
"args": args,
"kwargs": kwargs,
},
}
return self.session.post(
f"{self.base_url}/web/dataset/call_kw",
json=body,
)
def web_search_read(
self,
model: str,
domain: List[tuple],
spec: Dict[str, Any],
*args: Any,
**kwargs: Any,
) -> List[Dict[str, Any]]:
args = [domain, spec, *args]
res = self.rpc(model, "web_search_read", *args, **kwargs)
res.raise_for_status()
json = res.json()
if json.get("error"):
raise Exception(json.get("error"))
return json.get("result", {}).get("records", [])
+2
View File
@@ -6,6 +6,8 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"click>=8.1.8",
"pytest>=8.3.5",
"python-dotenv>=1.1.0",
"requests>=2.32.3",
"rich>=13.9.4",
]
+72
View File
@@ -0,0 +1,72 @@
import os
from dataclasses import dataclass, field
from pathlib import Path
import tomllib
from dotenv import load_dotenv
@dataclass
class Config:
config_path: Path = Path("config.toml")
vfyd_tags: list[str] = field(default_factory=lambda: [
"vfyd accounting",
"vfyd discuss",
"vfyd easy",
"vfyd editor",
"vfyd js",
"vfyd marketing",
"vfyd pos",
"vfyd sale",
"vfyd security",
"vfyd spreadsheet",
"vfyd stock",
"vfyd studio",
"vfyd vidange",
"vfyd website",
"vfyd industry",
"vfyd voip",
])
_config: dict = field(init=False, repr=False)
def __post_init__(self):
load_dotenv()
self._config = self._load_config()
self.session_id = self._get_session_id()
self.team = self._validate_team()
def _load_config(self) -> dict:
"""Load the configuration file."""
if not self.config_path.exists():
raise FileNotFoundError(f"Configuration file '{self.config_path}' not found.")
try:
with self.config_path.open("rb") as f:
return tomllib.load(f)
except tomllib.TOMLDecodeError as e:
raise ValueError(f"Error decoding TOML file '{self.config_path}': {e}")
def _get_session_id(self) -> str:
"""Prioritize configuration sources for session ID."""
session_id = self._get("odoo", "session_id") or os.getenv("ODOO_SESSION_ID")
if not session_id:
raise ValueError("No valid session ID found in config or environment")
return session_id
def _validate_team(self) -> list[str]:
"""Validate and sanitize team members."""
team = self._get("team", "members")
if not team:
raise ValueError("No team members defined in configuration.")
return list({member.lower().strip() for member in team})
def _get(self, section: str, key: str, default=None):
"""Retrieve a configuration value."""
return self._config.get(section, {}).get(key, default)
config = Config()
+13
View File
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from datetime import datetime
from model import OdooModel
@dataclass
class Leave(OdooModel):
_name = "hr.leave.report.calendar"
start_datetime: datetime
stop_datetime: datetime
employee_id: str
+183
View File
@@ -0,0 +1,183 @@
import random
import re
from collections import defaultdict
from datetime import datetime
from itertools import cycle
import click
import odoo_client
from config import config
from leave import Leave
from rich import console, print
from storage import DispatchStorage
from tasks import Task
console = console.Console()
team = config.team
vfyd_tags = config.vfyd_tags
@click.group()
def cli() -> None:
pass
@cli.command()
def dispatch():
"""Randomly distribute unassigned tasks among team members."""
client = odoo_client.OdooClient()
today = datetime.now().strftime("%Y-%m-%d")
domain = [
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
morning, afternoon = team_availability.get(employee, (True, True))
team_availability[employee] = (
morning and not is_morning,
afternoon and not is_afternoon,
)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
if not available_employees:
print("[red]No available employees for dispatch.[/red]")
return
previous_dispatch = DispatchStorage.load_week_to_date()
previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks
]
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
("id", "not in", previous_dispatch_ids),
]
records = client.web_search_read(Task, domain)
random.shuffle(records)
random.shuffle(available_employees)
employee_cycle = cycle(available_employees)
new_dispatch = defaultdict(list)
for task in records:
employee = next(employee_cycle)
new_dispatch[employee].append(task)
merged_dispatch = defaultdict(list)
if previous_dispatch_ids:
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
("id", "in", previous_dispatch_ids),
]
old_tasks = client.web_search_read(Task, domain)
old_tasks_by_id = {task.id: task for task in old_tasks}
else:
old_tasks_by_id = {}
for employee, task_ids in previous_dispatch.items():
merged_dispatch[employee].extend(
old_tasks_by_id[tid] for tid in task_ids if tid in old_tasks_by_id
)
for employee, tasks in new_dispatch.items():
merged_dispatch[employee].extend(tasks)
dispatch_to_save = {
employee: [task.id for task in tasks]
for employee, tasks in merged_dispatch.items()
}
DispatchStorage.save(dispatch_to_save)
for employee, tasks in sorted(merged_dispatch.items()):
if not tasks:
continue
print(f"**{employee}:**")
for task in tasks:
print(task.url)
print("\n")
@cli.command()
def status():
"""Show all unassigned tasks in the backlog."""
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = client.web_search_read(Task, domain)
if not tasks:
print("[yellow]No unassigned tasks in the backlog.[/yellow]")
else:
for task in tasks:
print(task)
@cli.command()
def check():
"""Check which previously dispatched tasks are still not completed."""
dispatch = DispatchStorage.load_week_to_date()
task_id_to_employee = {
task_id: employee
for employee, task_ids in dispatch.items()
for task_id in task_ids
}
client = odoo_client.OdooClient()
domain = [
("id", "in", list(task_id_to_employee.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = client.web_search_read(Task, domain)
if not tasks:
print("[green]All dispatched tasks have been completed![/green]")
return
not_done = defaultdict(list)
for task in tasks:
not_done[task_id_to_employee[task.id]].append(task)
for employee, pending_tasks in not_done.items():
console.rule(f"[red]{employee}[/red]")
for task in pending_tasks:
print(task)
if __name__ == "__main__":
cli()
+55
View File
@@ -0,0 +1,55 @@
from dataclasses import Field, fields
from datetime import datetime
from typing import Any, Self, get_origin
class OdooModel:
@classmethod
def specification(cls) -> dict[str, Any]:
def field_spec(f: Field) -> dict[str, Any]:
field_type = f.type
if f.name.endswith("_id") or f.name.endswith("_ids"):
return {
f.name: {
"fields": {
"display_name": {},
},
},
}
elif field_type in {str, int, bool, datetime}:
return {f.name: {}}
else:
raise NotImplementedError(f"Unsupported field type: {field_type}")
spec = dict()
for f in fields(cls):
if f.init:
spec.update(field_spec(f))
return spec
@classmethod
def from_record(cls, record: dict) -> Self:
init_args = {}
for f in fields(cls):
if f.init and f.name in record:
value = record[f.name]
if f.type is datetime:
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
if f.name.endswith("_id"):
if f.type is str:
value = value["display_name"]
else:
raise NotImplementedError()
if f.name.endswith("_ids"):
if get_origin(f.type) is list and f.type.__args__[0] is str:
value = [v["display_name"] for v in value]
else:
raise NotImplementedError()
init_args[f.name] = value
return cls(**init_args)
+57
View File
@@ -0,0 +1,57 @@
import random
from typing import Any, TypeVar
import requests
from config import config
from model import OdooModel
T = TypeVar("T", bound=OdooModel)
class OdooClient:
def __init__(self):
self.base_url = "https://www.odoo.com"
self.session = requests.Session()
self.session.cookies.set("session_id", config.session_id)
self.session.cookies.set("cids", "1")
self.session.cookies.set("frontend_lang", "en_US")
def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response:
body: dict[str, Any] = {
"jsonrpc": "2.0",
"method": method,
"id": random.randint(1, 1_000_000),
"params": {
"model": model,
"method": method,
"args": args,
"kwargs": kwargs,
},
}
response = self.session.post(
f"{self.base_url}/web/dataset/call_kw",
json=body,
)
response.raise_for_status()
return response
def web_search_read(
self,
model: type[T],
domain: list[tuple[Any, ...]],
*args: Any,
**kwargs: Any,
) -> list[T]:
args = [domain, model.specification(), *args]
response = self.rpc(model._name, "web_search_read", *args, **kwargs)
json_data = response.json()
if error := json_data.get("error"):
raise RuntimeError(f"Odoo RPC Error: {error}")
records = json_data.get("result", {}).get("records", [])
return [model.from_record(record) for record in records]
+41
View File
@@ -0,0 +1,41 @@
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
class DispatchStorage:
@staticmethod
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return Path(f"out/{date}_dispatch.json")
@staticmethod
def load(date: Optional[str] = None) -> dict[str, list[int]]:
path = DispatchStorage.path(date)
if not path.exists():
return {}
with path.open("r", encoding="utf-8") as f:
return json.load(f)
@staticmethod
def save(dispatch: dict[str, list[int]], date: Optional[str] = None) -> None:
path = DispatchStorage.path(date)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(dispatch, f)
@staticmethod
def load_week_to_date() -> dict[str, list[int]]:
today = datetime.now()
start_of_week = today - timedelta(days=today.weekday()) # Previous Monday
combined_dispatch: dict[str, list[int]] = {}
for i in range((today - start_of_week).days + 1):
date_str = (start_of_week + timedelta(days=i)).strftime("%Y-%m-%d")
daily_dispatch = DispatchStorage.load(date_str)
for key, values in daily_dispatch.items():
combined_dispatch.setdefault(key, []).extend(values)
return combined_dispatch
+18
View File
@@ -0,0 +1,18 @@
from dataclasses import dataclass, field
from model import OdooModel
@dataclass
class Task(OdooModel):
_name = "project.task"
id: int
name: str
tag_ids: list[str]
stage_id: str
priority: bool
url: str = field(init=False)
def __post_init__(self) -> None:
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"
-60
View File
@@ -1,60 +0,0 @@
from dataclasses import dataclass, field
from typing import Dict, List
from odoo_client import OdooClient
from rich.table import Table
@dataclass
class Task:
id: int
name: str
tags: List[str]
stage: str
priority: bool
url: str = field(init=False)
def __post_init__(self) -> None:
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"
def __rich__(self):
table = Table(show_header=True, header_style="bold magenta", box=None)
table.add_column("Field", style="cyan", justify="right")
table.add_column("Value", style="bold green")
table.add_row("ID", str(self.id))
table.add_row("Name", self.name)
table.add_row("Tags", ", ".join(self.tags) if self.tags else "[dim]No tags[/dim]")
table.add_row("Stage", f"[yellow]{self.stage}[/yellow]")
table.add_row("Priority", "[bold red]High[/bold red]" if self.priority else "[dim]Low[/dim]")
table.add_row("URL", f"[blue underline]{self.url}[/blue underline]")
return table
def map_record_to_task(record: Dict) -> Task:
return Task(
id=record["id"],
name=record["name"],
tags=[tag["display_name"] for tag in record["tag_ids"]],
stage=record["stage_id"]["display_name"],
priority=record["priority"] == "1",
)
def fetch_tasks(client: OdooClient, domain: List[tuple[str, str, str]]) -> List[Task]:
records = client.web_search_read(
"project.task",
domain,
spec={
"name": {},
"priority": {},
"tag_ids": {
"fields": {
"display_name": {},
},
},
"stage_id": {
"fields": {
"display_name": {},
},
},
},
)
return [map_record_to_task(record) for record in records]
Generated
+55
View File
@@ -76,6 +76,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442 },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050 },
]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
@@ -97,6 +106,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979 },
]
[[package]]
name = "packaging"
version = "24.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d0/63/68dbb6eb2de9cb10ee4c9c14a0148804425e13c4fb20d61cce69f53106da/packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f", size = 163950 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/88/ef/eb23f262cca3c0c4eb7ab1933c3b1f03d021f2c48f54763065b6f0e321be/packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759", size = 65451 },
]
[[package]]
name = "pluggy"
version = "1.5.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/96/2d/02d4312c973c6050a18b314a5ad0b3210edb65a906f868e31c111dede4a6/pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1", size = 67955 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/88/5f/e351af9a41f866ac3f1fac4ca0613908d9a41741cfcf2228f4ad853b697d/pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669", size = 20556 },
]
[[package]]
name = "pygments"
version = "2.19.1"
@@ -106,6 +133,30 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/0b/9fcc47d19c48b59121088dd6da2488a49d5f72dacf8262e2790a1d2c7d15/pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c", size = 1225293 },
]
[[package]]
name = "pytest"
version = "8.3.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ae/3c/c9d525a414d506893f0cd8a8d0de7706446213181570cdbd766691164e40/pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845", size = 1450891 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634 },
]
[[package]]
name = "python-dotenv"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/88/2c/7bb1416c5620485aa793f2de31d3df393d3686aa8a8506d11e10e13c5baf/python_dotenv-1.1.0.tar.gz", hash = "sha256:41f90bc6f5f177fb41f53e87666db362025010eb28f60a01c9143bfa33a2b2d5", size = 39920 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256 },
]
[[package]]
name = "requests"
version = "2.32.3"
@@ -140,6 +191,8 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "click" },
{ name = "pytest" },
{ name = "python-dotenv" },
{ name = "requests" },
{ name = "rich" },
]
@@ -147,6 +200,8 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.8" },
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "python-dotenv", specifier = ">=1.1.0" },
{ name = "requests", specifier = ">=2.32.3" },
{ name = "rich", specifier = ">=13.9.4" },
]