Compare commits

..

10 Commits

Author SHA1 Message Date
Hubert Van De Walle 9484dc5a4e Fix invalid domain 2026-04-29 09:42:18 +02:00
Hubert Van De Walle 7298a4abfa Cleaner code 2026-04-24 13:56:58 +02:00
Hubert Van De Walle 26c0f1da8e Don't overwrite dispatch 2026-04-21 23:43:04 +02:00
Hubert Van De Walle 1d46d78c35 chiffon 2026-03-04 16:02:33 +01:00
Hubert Van De Walle 920c746444 add member 2025-11-10 11:05:57 +01:00
Hubert Van De Walle f4df1935a9 Add team member 2025-09-15 10:33:52 +02:00
Hubert Van De Walle eebeed58a5 des trucs
Co-authored-by: sesn-odoo <sesn@odoo.com>
2025-07-22 11:47:33 +02:00
Hubert Van De Walle a49c5f012f Return an empty dict when loading non existant file 2025-07-22 11:23:07 +02:00
Hubert Van De Walle 7e7ffd6f49 Update tags and team 2025-07-22 11:20:50 +02:00
Hubert Van De Walle e7fa624b42 Ignore IDE files 2025-07-22 11:20:29 +02:00
9 changed files with 210 additions and 121 deletions
+2
View File
@@ -1,3 +1,5 @@
__pycache__/ __pycache__/
out/ out/
.venv .venv
.idea/
.vscode/
+15 -3
View File
@@ -1,9 +1,9 @@
import random import random
from datetime import datetime
vfyd_tags = [ vfyd_tags = [
"vfyd accounting", "vfyd accounting",
"vfyd discuss", "vfyd discuss",
"vfyd easy",
"vfyd editor", "vfyd editor",
"vfyd js", "vfyd js",
"vfyd marketing", "vfyd marketing",
@@ -17,16 +17,28 @@ vfyd_tags = [
"vfyd website", "vfyd website",
"vfyd industry", "vfyd industry",
"vfyd voip", "vfyd voip",
"vfyd ai",
"vfyd iot",
] ]
team = [ type TeamMember = str
"nle",
team: list[TeamMember] = [
"abz", "abz",
"hael", "hael",
"roen", "roen",
"sbel", "sbel",
"wasa", "wasa",
"huvw", "huvw",
"artn",
"gvar",
"sben",
] ]
is_friday = datetime.now().weekday() == 4
if not is_friday:
team.append("nle")
random.shuffle(team) random.shuffle(team)
+39
View File
@@ -0,0 +1,39 @@
import odoo_client
from datetime import datetime
from leave import Leave
import re
class LeaveService:
def __init__(self, client: odoo_client.OdooClient):
self.client = client
def fetch_available_employees(self, team: list[str]) -> list:
today = datetime.now().strftime("%Y-%m-%d")
domain = [
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = self.client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
if is_morning:
team_availability[employee] = (False, team_availability[employee][1])
if is_afternoon:
team_availability[employee] = (team_availability[employee][0], False)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
return available_employees
+39 -84
View File
@@ -1,16 +1,19 @@
import click import click
import random import random
import odoo_client import odoo_client
import re
from rich import print, console from rich import print, console
from datetime import datetime
from collections import defaultdict from collections import defaultdict
from config import vfyd_tags, team from config import vfyd_tags, team, TeamMember
from leave_service import LeaveService
from task_repository import TaskRepository
from storage import JsonDispatchStorage, DispatchStorage
from tasks import Task from tasks import Task
from leave import Leave
from storage import DispatchStorage
console = console.Console() console = console.Console()
storage: DispatchStorage = JsonDispatchStorage()
client = odoo_client.OdooClient()
leave_service = LeaveService(client)
task_repository = TaskRepository(client, vfyd_tags)
@click.group() @click.group()
@@ -22,80 +25,48 @@ def cli() -> None:
def dispatch(): def dispatch():
"""Randomly distribute unassigned tasks among team members.""" """Randomly distribute unassigned tasks among team members."""
client = odoo_client.OdooClient() if storage.load():
today = datetime.now().strftime("%Y-%m-%d") print("[red]Tickets were already dispatched.[/red]")
domain = [ return
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = client.web_search_read(Leave, domain)
team_availability = dict() available_employees = leave_service.fetch_available_employees(team)
for employee in team:
team_availability[employee] = (True, True)
for leave in leaves: if not available_employees:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower() print("[red]No available employees for dispatch.[/red]")
is_morning = leave.start_datetime.hour < 11 return
is_afternoon = leave.stop_datetime.hour >= 14
if is_morning:
team_availability[employee] = (False, team_availability[employee][1])
if is_afternoon:
team_availability[employee] = (team_availability[employee][0], False)
available_employees = [ previous_dispatch = storage.load_week_to_date()
employee
for employee, availability in team_availability.items()
if availability[0] or availability[1]
]
previous_dispatch = DispatchStorage.load_week_to_date()
previous_dispatch_ids = [ previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks task_id for tasks in previous_dispatch.values() for task_id in tasks
] ]
domain = [ # Find unsorted tasks that have not been dispatched on previous days,
("stage_id", "=", 194), # Assign them to team members and save the result
("user_ids", "=", False), new_unsorted_tasks = task_repository.get_unsorted_excluding(previous_dispatch_ids)
("project_id", "=", 49), random.shuffle(new_unsorted_tasks)
("tag_ids", "not in", vfyd_tags), new_dispatch: dict[TeamMember, list[int]] = defaultdict(list)
("id", "not in", previous_dispatch_ids), out: dict[TeamMember, list[Task]] = defaultdict(list)
] for idx, task in enumerate(new_unsorted_tasks):
records = client.web_search_read(Task, domain) member = available_employees[idx % len(available_employees)]
new_dispatch[member].append(task.id)
out[member].append(task)
storage.save(new_dispatch)
random.shuffle(records) # Find unsorted tasks that have been dispatched on previous days.
dispatch = defaultdict(list) # Add them back to the previously assign team members
out = defaultdict(list) old_unsorted_tasks = task_repository.get_unsorted_within(previous_dispatch_ids)
for idx, task in enumerate(records): combined_tasks_to_sort: dict[TeamMember, list[Task]] = defaultdict(list)
dispatch[available_employees[idx % len(available_employees)]].append(task.id)
out[available_employees[idx % len(available_employees)]].append(task)
DispatchStorage.save(dispatch)
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
("id", "in", previous_dispatch_ids),
]
records = client.web_search_read(Task, domain)
merged = defaultdict(list)
for member, task_ids in previous_dispatch.items(): for member, task_ids in previous_dispatch.items():
for task_id in task_ids: for task_id in task_ids:
task = next((t for t in records if t.id == task_id), None) task = next((t for t in old_unsorted_tasks if t.id == task_id), None)
if task: if task:
merged[member].append(task) combined_tasks_to_sort[member].append(task)
# Merge the new unsorted tasks with the old ones
for member, tasks in out.items(): for member, tasks in out.items():
merged[member].extend(tasks) combined_tasks_to_sort[member].extend(tasks)
for key, tasks in merged.items(): for key, tasks in combined_tasks_to_sort.items():
print(f"**{key}:**") print(f"**{key}:**")
for task in tasks: for task in tasks:
print(task.url) print(task.url)
@@ -104,15 +75,7 @@ def dispatch():
@cli.command() @cli.command()
def status(): def status():
"""Show all unassigned tasks in the backlog.""" """Show all unassigned tasks in the backlog."""
tasks = task_repository.get_unassigned_tasks()
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = client.web_search_read(Task, domain)
for task in tasks: for task in tasks:
print(task) print(task)
@@ -120,17 +83,9 @@ def status():
@cli.command() @cli.command()
def check(): def check():
"""Check which previously dispatched tasks are still not completed.""" """Check which previously dispatched tasks are still not completed."""
dispatch = storage.load()
dispatch = DispatchStorage.load()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs} reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
client = odoo_client.OdooClient() tasks = task_repository.get_tasks_with_ids(reverse_dispatch.keys())
domain = [
("id", "in", list(reverse_dispatch.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = client.web_search_read(Task, domain)
not_done = defaultdict(list) not_done = defaultdict(list)
for task in tasks: for task in tasks:
+4 -4
View File
@@ -1,12 +1,12 @@
from typing import Dict, Self, Any, get_origin from typing import Self, Any, get_origin
from dataclasses import fields, Field from dataclasses import fields, Field
from datetime import datetime from datetime import datetime
class OdooModel: class OdooModel:
@classmethod @classmethod
def specification(cls) -> Dict[str, Any]: def specification(cls) -> dict[str, Any]:
def field_spec(f: Field) -> Dict[str, Any]: def field_spec(f: Field) -> dict[str, Any]:
field_type = f.type field_type = f.type
if f.name.endswith("_id") or f.name.endswith("_ids"): if f.name.endswith("_id") or f.name.endswith("_ids"):
return { return {
@@ -29,7 +29,7 @@ class OdooModel:
return spec return spec
@classmethod @classmethod
def from_record(cls, record: Dict) -> Self: def from_record(cls, record: dict) -> Self:
init_args = {} init_args = {}
for f in fields(cls): for f in fields(cls):
if f.init and f.name in record: if f.init and f.name in record:
+13 -9
View File
@@ -1,7 +1,7 @@
import os import os
import requests import requests
import random import random
from typing import Any, Dict, List, TypeVar, Type from typing import Any, TypeVar
from model import OdooModel from model import OdooModel
@@ -22,10 +22,10 @@ class OdooClient:
def rpc( def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response: ) -> requests.Response:
body: Dict[str, Any] = { body: dict[str, Any] = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": method, "method": method,
"id": random.randint(1, 1000000), "id": random.randint(1, 1_000_000),
"params": { "params": {
"model": model, "model": model,
"method": method, "method": method,
@@ -34,23 +34,27 @@ class OdooClient:
}, },
} }
return self.session.post( response = self.session.post(
f"{self.base_url}/web/dataset/call_kw", f"{self.base_url}/web/dataset/call_kw",
json=body, json=body,
) )
response.raise_for_status()
return response
def web_search_read( def web_search_read(
self, self,
model: Type[T], model: type[T],
domain: List[tuple], domain: list[tuple[Any, ...]],
*args: Any, *args: Any,
**kwargs: Any, **kwargs: Any,
) -> List[T]: ) -> list[T]:
args = [domain, model.specification(), *args] args = [domain, model.specification(), *args]
res = self.rpc(model._name, "web_search_read", *args, **kwargs) res = self.rpc(model._name, "web_search_read", *args, **kwargs)
res.raise_for_status() res.raise_for_status()
json = res.json() json = res.json()
if json.get("error"):
raise Exception(json.get("error")) if error := json.get("error"):
raise RuntimeError(f"Odoo RPC Error: {error}")
records = json.get("result", {}).get("records", []) records = json.get("result", {}).get("records", [])
return [model.from_record(record) for record in records] return [model.from_record(record) for record in records]
+33 -19
View File
@@ -1,27 +1,18 @@
from typing import Dict, List, Optional from typing import Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path
import json import json
from config import TeamMember
class DispatchStorage: class DispatchStorage:
@classmethod def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: ...
def path(cls, date: Optional[str] = None) -> str:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return f"out/{date}_dispatch.json"
@classmethod def save(
def load(cls, date: Optional[str] = None) -> Dict[str, List[int]]: self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
with open(cls.path(date), "r") as f: ) -> None: ...
return json.load(f)
@classmethod def load_week_to_date(self) -> dict[TeamMember, list[int]]:
def save(cls, dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
with open(cls.path(date), "w") as f:
json.dump(dispatch, f)
@classmethod
def load_week_to_date(cls) -> Dict[str, List[int]]:
""" """
Loads and combines dispatch data from the start of the current week up to today. Loads and combines dispatch data from the start of the current week up to today.
""" """
@@ -29,10 +20,10 @@ class DispatchStorage:
days_since_monday = today.weekday() days_since_monday = today.weekday()
current_date = today - timedelta(days=days_since_monday) current_date = today - timedelta(days=days_since_monday)
combined_dispatch: Dict[str, List[int]] = {} combined_dispatch: dict[str, list[int]] = {}
while current_date.date() < today.date(): while current_date.date() < today.date():
date_str = current_date.strftime("%Y-%m-%d") date_str = current_date.strftime("%Y-%m-%d")
daily_dispatch = cls.load(date_str) daily_dispatch = self.load(date_str)
for key, values in daily_dispatch.items(): for key, values in daily_dispatch.items():
if key in combined_dispatch: if key in combined_dispatch:
combined_dispatch[key].extend(values) combined_dispatch[key].extend(values)
@@ -41,3 +32,26 @@ class DispatchStorage:
current_date += timedelta(days=1) current_date += timedelta(days=1)
return combined_dispatch return combined_dispatch
class JsonDispatchStorage(DispatchStorage):
@staticmethod
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return Path("out", f"{date}_dispatch.json")
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]:
try:
with open(self.path(date), "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def save(
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
) -> None:
path = self.path(date)
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
json.dump(dispatch, f)
+64
View File
@@ -0,0 +1,64 @@
from typing import Iterable
from tasks import Task
STAGE_BACKLOG = 194
PROJECT_ID = 49
class TaskRepository:
def __init__(self, client, vfyd_tags: Iterable[int]):
self.client = client
self._base_domain = [
("stage_id", "=", STAGE_BACKLOG),
("project_id", "=", PROJECT_ID),
("tag_ids", "not in", list(vfyd_tags)),
]
def _search(self, domain: list) -> list[Task]:
return self.client.web_search_read(Task, domain)
def get_unassigned_tasks(self) -> list[Task]:
return self._search(
[
*self._base_domain,
("user_ids", "=", False),
]
)
def get_tasks_with_ids(self, ids: Iterable[int]) -> list[Task]:
ids = list(ids)
if not ids:
return []
domain = [
*self._base_domain,
("id", "in", ids),
]
return self._search(domain)
def get_unsorted_excluding(self, exclude_ids: Iterable[int]) -> list[Task]:
exclude_ids = list(exclude_ids)
domain = [
*self._base_domain,
("user_ids", "=", False),
]
if exclude_ids:
domain.append(("id", "not in", exclude_ids))
return self._search(domain)
def get_unsorted_within(self, ids: Iterable[int]) -> list[Task]:
included_ids = list(ids)
if not included_ids:
return []
domain = [
*self._base_domain,
("user_ids", "=", False),
("id", "in", included_ids),
]
return self._search(domain)
+1 -2
View File
@@ -1,5 +1,4 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List
from model import OdooModel from model import OdooModel
@@ -9,7 +8,7 @@ class Task(OdooModel):
id: int id: int
name: str name: str
tag_ids: List[str] tag_ids: list[str]
stage_id: str stage_id: str
priority: bool priority: bool
url: str = field(init=False) url: str = field(init=False)