Compare commits

...

6 Commits

Author SHA1 Message Date
Hubert Van De Walle 9484dc5a4e Fix invalid domain 2026-04-29 09:42:18 +02:00
Hubert Van De Walle 7298a4abfa Cleaner code 2026-04-24 13:56:58 +02:00
Hubert Van De Walle 26c0f1da8e Don't overwrite dispatch 2026-04-21 23:43:04 +02:00
Hubert Van De Walle 1d46d78c35 chiffon 2026-03-04 16:02:33 +01:00
Hubert Van De Walle 920c746444 add member 2025-11-10 11:05:57 +01:00
Hubert Van De Walle f4df1935a9 Add team member 2025-09-15 10:33:52 +02:00
5 changed files with 181 additions and 107 deletions
+12 -2
View File
@@ -1,4 +1,5 @@
import random import random
from datetime import datetime
vfyd_tags = [ vfyd_tags = [
"vfyd accounting", "vfyd accounting",
@@ -20,8 +21,9 @@ vfyd_tags = [
"vfyd iot", "vfyd iot",
] ]
team = [ type TeamMember = str
"nle",
team: list[TeamMember] = [
"abz", "abz",
"hael", "hael",
"roen", "roen",
@@ -29,6 +31,14 @@ team = [
"wasa", "wasa",
"huvw", "huvw",
"artn", "artn",
"gvar",
"sben",
] ]
is_friday = datetime.now().weekday() == 4
if not is_friday:
team.append("nle")
random.shuffle(team) random.shuffle(team)
+39
View File
@@ -0,0 +1,39 @@
import odoo_client
from datetime import datetime
from leave import Leave
import re
class LeaveService:
def __init__(self, client: odoo_client.OdooClient):
self.client = client
def fetch_available_employees(self, team: list[str]) -> list:
today = datetime.now().strftime("%Y-%m-%d")
domain = [
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = self.client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
if is_morning:
team_availability[employee] = (False, team_availability[employee][1])
if is_afternoon:
team_availability[employee] = (team_availability[employee][0], False)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
return available_employees
+36 -83
View File
@@ -1,16 +1,19 @@
import click import click
import random import random
import odoo_client import odoo_client
import re
from rich import print, console from rich import print, console
from datetime import datetime
from collections import defaultdict from collections import defaultdict
from config import vfyd_tags, team from config import vfyd_tags, team, TeamMember
from leave_service import LeaveService
from task_repository import TaskRepository
from storage import JsonDispatchStorage, DispatchStorage
from tasks import Task from tasks import Task
from leave import Leave
from storage import DispatchStorage
console = console.Console() console = console.Console()
storage: DispatchStorage = JsonDispatchStorage()
client = odoo_client.OdooClient()
leave_service = LeaveService(client)
task_repository = TaskRepository(client, vfyd_tags)
@click.group() @click.group()
@@ -22,82 +25,48 @@ def cli() -> None:
def dispatch(): def dispatch():
"""Randomly distribute unassigned tasks among team members.""" """Randomly distribute unassigned tasks among team members."""
client = odoo_client.OdooClient() if storage.load():
today = datetime.now().strftime("%Y-%m-%d") print("[red]Tickets were already dispatched.[/red]")
domain = [ return
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team} available_employees = leave_service.fetch_available_employees(team)
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
if is_morning:
team_availability[employee] = (False, team_availability[employee][1])
if is_afternoon:
team_availability[employee] = (team_availability[employee][0], False)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
if not available_employees: if not available_employees:
print("[red]No available employees for dispatch.[/red]") print("[red]No available employees for dispatch.[/red]")
return return
previous_dispatch = DispatchStorage.load_week_to_date() previous_dispatch = storage.load_week_to_date()
previous_dispatch_ids = [ previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks task_id for tasks in previous_dispatch.values() for task_id in tasks
] ]
domain = [ # Find unsorted tasks that have not been dispatched on previous days,
("stage_id", "=", 194), # Assign them to team members and save the result
("user_ids", "=", False), new_unsorted_tasks = task_repository.get_unsorted_excluding(previous_dispatch_ids)
("project_id", "=", 49), random.shuffle(new_unsorted_tasks)
("tag_ids", "not in", vfyd_tags), new_dispatch: dict[TeamMember, list[int]] = defaultdict(list)
("id", "not in", previous_dispatch_ids), out: dict[TeamMember, list[Task]] = defaultdict(list)
] for idx, task in enumerate(new_unsorted_tasks):
records = client.web_search_read(Task, domain) member = available_employees[idx % len(available_employees)]
new_dispatch[member].append(task.id)
out[member].append(task)
storage.save(new_dispatch)
random.shuffle(records) # Find unsorted tasks that have been dispatched on previous days.
dispatch = defaultdict(list) # Add them back to the previously assign team members
out = defaultdict(list) old_unsorted_tasks = task_repository.get_unsorted_within(previous_dispatch_ids)
for idx, task in enumerate(records): combined_tasks_to_sort: dict[TeamMember, list[Task]] = defaultdict(list)
dispatch[available_employees[idx % len(available_employees)]].append(task.id)
out[available_employees[idx % len(available_employees)]].append(task)
DispatchStorage.save(dispatch)
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
("id", "in", previous_dispatch_ids),
]
records = client.web_search_read(Task, domain)
merged = defaultdict(list)
for member, task_ids in previous_dispatch.items(): for member, task_ids in previous_dispatch.items():
for task_id in task_ids: for task_id in task_ids:
task = next((t for t in records if t.id == task_id), None) task = next((t for t in old_unsorted_tasks if t.id == task_id), None)
if task: if task:
merged[member].append(task) combined_tasks_to_sort[member].append(task)
# Merge the new unsorted tasks with the old ones
for member, tasks in out.items(): for member, tasks in out.items():
merged[member].extend(tasks) combined_tasks_to_sort[member].extend(tasks)
for key, tasks in merged.items(): for key, tasks in combined_tasks_to_sort.items():
print(f"**{key}:**") print(f"**{key}:**")
for task in tasks: for task in tasks:
print(task.url) print(task.url)
@@ -106,15 +75,7 @@ def dispatch():
@cli.command() @cli.command()
def status(): def status():
"""Show all unassigned tasks in the backlog.""" """Show all unassigned tasks in the backlog."""
tasks = task_repository.get_unassigned_tasks()
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = client.web_search_read(Task, domain)
for task in tasks: for task in tasks:
print(task) print(task)
@@ -122,17 +83,9 @@ def status():
@cli.command() @cli.command()
def check(): def check():
"""Check which previously dispatched tasks are still not completed.""" """Check which previously dispatched tasks are still not completed."""
dispatch = storage.load()
dispatch = DispatchStorage.load()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs} reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
client = odoo_client.OdooClient() tasks = task_repository.get_tasks_with_ids(reverse_dispatch.keys())
domain = [
("id", "in", list(reverse_dispatch.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = client.web_search_read(Task, domain)
not_done = defaultdict(list) not_done = defaultdict(list)
for task in tasks: for task in tasks:
+30 -22
View File
@@ -2,32 +2,17 @@ from typing import Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
import json import json
from config import TeamMember
class DispatchStorage: class DispatchStorage:
@staticmethod def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: ...
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return Path("out", f"{date}_dispatch.json")
@staticmethod def save(
def load(date: Optional[str] = None) -> dict[str, list[int]]: self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
try: ) -> None: ...
with open(DispatchStorage.path(date), "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
@staticmethod def load_week_to_date(self) -> dict[TeamMember, list[int]]:
def save(dispatch: dict[str, list[int]], date: Optional[str] = None) -> None:
path = DispatchStorage.path(date)
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
json.dump(dispatch, f)
@staticmethod
def load_week_to_date() -> dict[str, list[int]]:
""" """
Loads and combines dispatch data from the start of the current week up to today. Loads and combines dispatch data from the start of the current week up to today.
""" """
@@ -38,7 +23,7 @@ class DispatchStorage:
combined_dispatch: dict[str, list[int]] = {} combined_dispatch: dict[str, list[int]] = {}
while current_date.date() < today.date(): while current_date.date() < today.date():
date_str = current_date.strftime("%Y-%m-%d") date_str = current_date.strftime("%Y-%m-%d")
daily_dispatch = DispatchStorage.load(date_str) daily_dispatch = self.load(date_str)
for key, values in daily_dispatch.items(): for key, values in daily_dispatch.items():
if key in combined_dispatch: if key in combined_dispatch:
combined_dispatch[key].extend(values) combined_dispatch[key].extend(values)
@@ -47,3 +32,26 @@ class DispatchStorage:
current_date += timedelta(days=1) current_date += timedelta(days=1)
return combined_dispatch return combined_dispatch
class JsonDispatchStorage(DispatchStorage):
@staticmethod
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return Path("out", f"{date}_dispatch.json")
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]:
try:
with open(self.path(date), "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def save(
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
) -> None:
path = self.path(date)
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
json.dump(dispatch, f)
+64
View File
@@ -0,0 +1,64 @@
from typing import Iterable
from tasks import Task
STAGE_BACKLOG = 194
PROJECT_ID = 49
class TaskRepository:
def __init__(self, client, vfyd_tags: Iterable[int]):
self.client = client
self._base_domain = [
("stage_id", "=", STAGE_BACKLOG),
("project_id", "=", PROJECT_ID),
("tag_ids", "not in", list(vfyd_tags)),
]
def _search(self, domain: list) -> list[Task]:
return self.client.web_search_read(Task, domain)
def get_unassigned_tasks(self) -> list[Task]:
return self._search(
[
*self._base_domain,
("user_ids", "=", False),
]
)
def get_tasks_with_ids(self, ids: Iterable[int]) -> list[Task]:
ids = list(ids)
if not ids:
return []
domain = [
*self._base_domain,
("id", "in", ids),
]
return self._search(domain)
def get_unsorted_excluding(self, exclude_ids: Iterable[int]) -> list[Task]:
exclude_ids = list(exclude_ids)
domain = [
*self._base_domain,
("user_ids", "=", False),
]
if exclude_ids:
domain.append(("id", "not in", exclude_ids))
return self._search(domain)
def get_unsorted_within(self, ids: Iterable[int]) -> list[Task]:
included_ids = list(ids)
if not included_ids:
return []
domain = [
*self._base_domain,
("user_ids", "=", False),
("id", "in", included_ids),
]
return self._search(domain)