Compare commits

...

14 Commits

Author SHA1 Message Date
Hubert Van De Walle 9484dc5a4e Fix invalid domain 2026-04-29 09:42:18 +02:00
Hubert Van De Walle 7298a4abfa Cleaner code 2026-04-24 13:56:58 +02:00
Hubert Van De Walle 26c0f1da8e Don't overwrite dispatch 2026-04-21 23:43:04 +02:00
Hubert Van De Walle 1d46d78c35 chiffon 2026-03-04 16:02:33 +01:00
Hubert Van De Walle 920c746444 add member 2025-11-10 11:05:57 +01:00
Hubert Van De Walle f4df1935a9 Add team member 2025-09-15 10:33:52 +02:00
Hubert Van De Walle eebeed58a5 des trucs
Co-authored-by: sesn-odoo <sesn@odoo.com>
2025-07-22 11:47:33 +02:00
Hubert Van De Walle a49c5f012f Return an empty dict when loading non existant file 2025-07-22 11:23:07 +02:00
Hubert Van De Walle 7e7ffd6f49 Update tags and team 2025-07-22 11:20:50 +02:00
Hubert Van De Walle e7fa624b42 Ignore IDE files 2025-07-22 11:20:29 +02:00
Hubert Van De Walle 2bcb5eef20 Add voip tag 2025-03-18 10:30:39 +01:00
Hubert Van De Walle 54ea243c8c wip 2025-03-07 10:37:04 +01:00
Hubert Van De Walle d3abdb10c4 check availabilities before dispatching 2025-03-05 15:10:30 +01:00
Hubert Van De Walle 47a3eed98b Infer field specification and mapping from model definition 2025-03-05 15:09:50 +01:00
12 changed files with 386 additions and 186 deletions
+2
View File
@@ -1,3 +1,5 @@
__pycache__/ __pycache__/
out/ out/
.venv .venv
.idea/
.vscode/
-109
View File
@@ -1,109 +0,0 @@
import click
import random
import odoo_client
import json
from tasks import fetch_tasks
from rich import print, console
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional
from config import vfyd_tags, team
console = console.Console()
def load_dispatch(date: Optional[str] = None) -> Dict[str, List[int]]:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
with open(f"out/{date}_dispatch.json", "r") as f:
return json.load(f)
def save_dispatch(dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
with open(f"out/{date}_dispatch.json", "w") as f:
json.dump(dispatch, f)
@click.group()
def cli() -> None:
pass
@cli.command()
def dispatch():
"""Randomly distribute unassigned tasks among team members."""
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
client = odoo_client.OdooClient()
records = fetch_tasks(client, domain)
random.shuffle(records)
dispatch = defaultdict(list)
out = defaultdict(list)
for idx, task in enumerate(records):
dispatch[team[idx % len(team)]].append(task.id)
out[team[idx % len(team)]].append(task)
save_dispatch(dispatch)
for key, tasks in out.items():
print(f"**{key}:**")
for task in tasks:
print(task.url)
@cli.command()
def status():
"""Show all unassigned tasks in the backlog."""
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = fetch_tasks(client, domain)
for task in tasks:
print(task)
@cli.command()
def check():
"""Check which previously dispatched tasks are still not completed."""
dispatch = load_dispatch()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
client = odoo_client.OdooClient()
domain = [
("id", "in", list(reverse_dispatch.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = fetch_tasks(client, domain)
not_done = defaultdict(list)
for task in tasks:
not_done[reverse_dispatch[task.id]].append(task)
for key, value in not_done.items():
if value:
console.rule(f"[red]{key}[/red]")
for task in value:
print(task)
if __name__ == "__main__":
cli()
+17 -4
View File
@@ -1,9 +1,9 @@
import random import random
from datetime import datetime
vfyd_tags = [ vfyd_tags = [
"vfyd accounting", "vfyd accounting",
"vfyd discuss", "vfyd discuss",
"vfyd easy",
"vfyd editor", "vfyd editor",
"vfyd js", "vfyd js",
"vfyd marketing", "vfyd marketing",
@@ -16,16 +16,29 @@ vfyd_tags = [
"vfyd vidange", "vfyd vidange",
"vfyd website", "vfyd website",
"vfyd industry", "vfyd industry",
"vfyd voip",
"vfyd ai",
"vfyd iot",
] ]
team = [ type TeamMember = str
"nle",
team: list[TeamMember] = [
"abz", "abz",
"hael", "hael",
#"roen", "roen",
"sbel", "sbel",
"wasa", "wasa",
"huvw", "huvw",
"artn",
"gvar",
"sben",
] ]
is_friday = datetime.now().weekday() == 4
if not is_friday:
team.append("nle")
random.shuffle(team) random.shuffle(team)
+12
View File
@@ -0,0 +1,12 @@
from dataclasses import dataclass
from model import OdooModel
from datetime import datetime
@dataclass
class Leave(OdooModel):
_name = "hr.leave.report.calendar"
start_datetime: datetime
stop_datetime: datetime
employee_id: str
+39
View File
@@ -0,0 +1,39 @@
import odoo_client
from datetime import datetime
from leave import Leave
import re
class LeaveService:
def __init__(self, client: odoo_client.OdooClient):
self.client = client
def fetch_available_employees(self, team: list[str]) -> list:
today = datetime.now().strftime("%Y-%m-%d")
domain = [
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = self.client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
if is_morning:
team_availability[employee] = (False, team_availability[employee][1])
if is_afternoon:
team_availability[employee] = (team_availability[employee][0], False)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
return available_employees
+102
View File
@@ -0,0 +1,102 @@
import click
import random
import odoo_client
from rich import print, console
from collections import defaultdict
from config import vfyd_tags, team, TeamMember
from leave_service import LeaveService
from task_repository import TaskRepository
from storage import JsonDispatchStorage, DispatchStorage
from tasks import Task
console = console.Console()
storage: DispatchStorage = JsonDispatchStorage()
client = odoo_client.OdooClient()
leave_service = LeaveService(client)
task_repository = TaskRepository(client, vfyd_tags)
@click.group()
def cli() -> None:
pass
@cli.command()
def dispatch():
"""Randomly distribute unassigned tasks among team members."""
if storage.load():
print("[red]Tickets were already dispatched.[/red]")
return
available_employees = leave_service.fetch_available_employees(team)
if not available_employees:
print("[red]No available employees for dispatch.[/red]")
return
previous_dispatch = storage.load_week_to_date()
previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks
]
# Find unsorted tasks that have not been dispatched on previous days,
# Assign them to team members and save the result
new_unsorted_tasks = task_repository.get_unsorted_excluding(previous_dispatch_ids)
random.shuffle(new_unsorted_tasks)
new_dispatch: dict[TeamMember, list[int]] = defaultdict(list)
out: dict[TeamMember, list[Task]] = defaultdict(list)
for idx, task in enumerate(new_unsorted_tasks):
member = available_employees[idx % len(available_employees)]
new_dispatch[member].append(task.id)
out[member].append(task)
storage.save(new_dispatch)
# Find unsorted tasks that have been dispatched on previous days.
# Add them back to the previously assign team members
old_unsorted_tasks = task_repository.get_unsorted_within(previous_dispatch_ids)
combined_tasks_to_sort: dict[TeamMember, list[Task]] = defaultdict(list)
for member, task_ids in previous_dispatch.items():
for task_id in task_ids:
task = next((t for t in old_unsorted_tasks if t.id == task_id), None)
if task:
combined_tasks_to_sort[member].append(task)
# Merge the new unsorted tasks with the old ones
for member, tasks in out.items():
combined_tasks_to_sort[member].extend(tasks)
for key, tasks in combined_tasks_to_sort.items():
print(f"**{key}:**")
for task in tasks:
print(task.url)
@cli.command()
def status():
"""Show all unassigned tasks in the backlog."""
tasks = task_repository.get_unassigned_tasks()
for task in tasks:
print(task)
@cli.command()
def check():
"""Check which previously dispatched tasks are still not completed."""
dispatch = storage.load()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
tasks = task_repository.get_tasks_with_ids(reverse_dispatch.keys())
not_done = defaultdict(list)
for task in tasks:
not_done[reverse_dispatch[task.id]].append(task)
for key, value in not_done.items():
if value:
console.rule(f"[red]{key}[/red]")
for task in value:
print(task)
if __name__ == "__main__":
cli()
+55
View File
@@ -0,0 +1,55 @@
from typing import Self, Any, get_origin
from dataclasses import fields, Field
from datetime import datetime
class OdooModel:
@classmethod
def specification(cls) -> dict[str, Any]:
def field_spec(f: Field) -> dict[str, Any]:
field_type = f.type
if f.name.endswith("_id") or f.name.endswith("_ids"):
return {
f.name: {
"fields": {
"display_name": {},
},
},
}
elif field_type in {str, int, bool, datetime}:
return {f.name: {}}
else:
raise NotImplementedError(f"Unsupported field type: {field_type}")
spec = dict()
for f in fields(cls):
if f.init:
spec.update(field_spec(f))
return spec
@classmethod
def from_record(cls, record: dict) -> Self:
init_args = {}
for f in fields(cls):
if f.init and f.name in record:
value = record[f.name]
if f.type is datetime:
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
if f.name.endswith("_id"):
if f.type is str:
value = value["display_name"]
else:
raise NotImplementedError()
if f.name.endswith("_ids"):
if get_origin(f.type) is list and f.type.__args__[0] is str:
value = [v["display_name"] for v in value]
else:
raise NotImplementedError()
init_args[f.name] = value
return cls(**init_args)
+21 -13
View File
@@ -1,7 +1,11 @@
import os import os
import requests import requests
import random import random
from typing import Any, Dict, List from typing import Any, TypeVar
from model import OdooModel
T = TypeVar("T", bound=OdooModel)
class OdooClient: class OdooClient:
@@ -18,10 +22,10 @@ class OdooClient:
def rpc( def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response: ) -> requests.Response:
body: Dict[str, Any] = { body: dict[str, Any] = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": method, "method": method,
"id": random.randint(1, 1000000), "id": random.randint(1, 1_000_000),
"params": { "params": {
"model": model, "model": model,
"method": method, "method": method,
@@ -30,23 +34,27 @@ class OdooClient:
}, },
} }
return self.session.post( response = self.session.post(
f"{self.base_url}/web/dataset/call_kw", f"{self.base_url}/web/dataset/call_kw",
json=body, json=body,
) )
response.raise_for_status()
return response
def web_search_read( def web_search_read(
self, self,
model: str, model: type[T],
domain: List[tuple], domain: list[tuple[Any, ...]],
spec: Dict[str, Any],
*args: Any, *args: Any,
**kwargs: Any, **kwargs: Any,
) -> List[Dict[str, Any]]: ) -> list[T]:
args = [domain, spec, *args] args = [domain, model.specification(), *args]
res = self.rpc(model, "web_search_read", *args, **kwargs) res = self.rpc(model._name, "web_search_read", *args, **kwargs)
res.raise_for_status() res.raise_for_status()
json = res.json() json = res.json()
if json.get("error"):
raise Exception(json.get("error")) if error := json.get("error"):
return json.get("result", {}).get("records", []) raise RuntimeError(f"Odoo RPC Error: {error}")
records = json.get("result", {}).get("records", [])
return [model.from_record(record) for record in records]
+57
View File
@@ -0,0 +1,57 @@
from typing import Optional
from datetime import datetime, timedelta
from pathlib import Path
import json
from config import TeamMember
class DispatchStorage:
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: ...
def save(
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
) -> None: ...
def load_week_to_date(self) -> dict[TeamMember, list[int]]:
"""
Loads and combines dispatch data from the start of the current week up to today.
"""
today = datetime.now()
days_since_monday = today.weekday()
current_date = today - timedelta(days=days_since_monday)
combined_dispatch: dict[str, list[int]] = {}
while current_date.date() < today.date():
date_str = current_date.strftime("%Y-%m-%d")
daily_dispatch = self.load(date_str)
for key, values in daily_dispatch.items():
if key in combined_dispatch:
combined_dispatch[key].extend(values)
else:
combined_dispatch[key] = values.copy()
current_date += timedelta(days=1)
return combined_dispatch
class JsonDispatchStorage(DispatchStorage):
@staticmethod
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return Path("out", f"{date}_dispatch.json")
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]:
try:
with open(self.path(date), "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def save(
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
) -> None:
path = self.path(date)
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
json.dump(dispatch, f)
+64
View File
@@ -0,0 +1,64 @@
from typing import Iterable
from tasks import Task
STAGE_BACKLOG = 194
PROJECT_ID = 49
class TaskRepository:
def __init__(self, client, vfyd_tags: Iterable[int]):
self.client = client
self._base_domain = [
("stage_id", "=", STAGE_BACKLOG),
("project_id", "=", PROJECT_ID),
("tag_ids", "not in", list(vfyd_tags)),
]
def _search(self, domain: list) -> list[Task]:
return self.client.web_search_read(Task, domain)
def get_unassigned_tasks(self) -> list[Task]:
return self._search(
[
*self._base_domain,
("user_ids", "=", False),
]
)
def get_tasks_with_ids(self, ids: Iterable[int]) -> list[Task]:
ids = list(ids)
if not ids:
return []
domain = [
*self._base_domain,
("id", "in", ids),
]
return self._search(domain)
def get_unsorted_excluding(self, exclude_ids: Iterable[int]) -> list[Task]:
exclude_ids = list(exclude_ids)
domain = [
*self._base_domain,
("user_ids", "=", False),
]
if exclude_ids:
domain.append(("id", "not in", exclude_ids))
return self._search(domain)
def get_unsorted_within(self, ids: Iterable[int]) -> list[Task]:
included_ids = list(ids)
if not included_ids:
return []
domain = [
*self._base_domain,
("user_ids", "=", False),
("id", "in", included_ids),
]
return self._search(domain)
+17
View File
@@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from model import OdooModel
@dataclass
class Task(OdooModel):
_name = "project.task"
id: int
name: str
tag_ids: list[str]
stage_id: str
priority: bool
url: str = field(init=False)
def __post_init__(self) -> None:
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"
-60
View File
@@ -1,60 +0,0 @@
from dataclasses import dataclass, field
from typing import Dict, List
from odoo_client import OdooClient
from rich.table import Table
@dataclass
class Task:
id: int
name: str
tags: List[str]
stage: str
priority: bool
url: str = field(init=False)
def __post_init__(self) -> None:
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"
def __rich__(self):
table = Table(show_header=True, header_style="bold magenta", box=None)
table.add_column("Field", style="cyan", justify="right")
table.add_column("Value", style="bold green")
table.add_row("ID", str(self.id))
table.add_row("Name", self.name)
table.add_row("Tags", ", ".join(self.tags) if self.tags else "[dim]No tags[/dim]")
table.add_row("Stage", f"[yellow]{self.stage}[/yellow]")
table.add_row("Priority", "[bold red]High[/bold red]" if self.priority else "[dim]Low[/dim]")
table.add_row("URL", f"[blue underline]{self.url}[/blue underline]")
return table
def map_record_to_task(record: Dict) -> Task:
return Task(
id=record["id"],
name=record["name"],
tags=[tag["display_name"] for tag in record["tag_ids"]],
stage=record["stage_id"]["display_name"],
priority=record["priority"] == "1",
)
def fetch_tasks(client: OdooClient, domain: List[tuple[str, str, str]]) -> List[Task]:
records = client.web_search_read(
"project.task",
domain,
spec={
"name": {},
"priority": {},
"tag_ids": {
"fields": {
"display_name": {},
},
},
"stage_id": {
"fields": {
"display_name": {},
},
},
},
)
return [map_record_to_task(record) for record in records]