Compare commits

1 Commits

Author SHA1 Message Date
sena 3369b92b11 des trucs 2025-04-28 08:56:51 +02:00
13 changed files with 316 additions and 258 deletions
+3 -2
View File
@@ -1,5 +1,6 @@
__pycache__/
out/
.env
.venv
.idea/
.vscode/
config.toml
out/
+11
View File
@@ -0,0 +1,11 @@
[odoo]
session_id = "your-odoo-session-id-here"
[team]
members = [
"gram_1",
"gram_2",
"gram_3",
"gram_4",
"gram_5"
]
+2
View File
@@ -6,6 +6,8 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"click>=8.1.8",
"pytest>=8.3.5",
"python-dotenv>=1.1.0",
"requests>=2.32.3",
"rich>=13.9.4",
]
+69 -41
View File
@@ -1,44 +1,72 @@
import random
from datetime import datetime
import os
from dataclasses import dataclass, field
from pathlib import Path
vfyd_tags = [
"vfyd accounting",
"vfyd discuss",
"vfyd editor",
"vfyd js",
"vfyd marketing",
"vfyd pos",
"vfyd sale",
"vfyd security",
"vfyd spreadsheet",
"vfyd stock",
"vfyd studio",
"vfyd vidange",
"vfyd website",
"vfyd industry",
"vfyd voip",
"vfyd ai",
"vfyd iot",
]
type TeamMember = str
team: list[TeamMember] = [
"abz",
"hael",
"roen",
"sbel",
"wasa",
"huvw",
"artn",
"gvar",
"sben",
]
is_friday = datetime.now().weekday() == 4
if not is_friday:
team.append("nle")
import tomllib
from dotenv import load_dotenv
random.shuffle(team)
@dataclass
class Config:
config_path: Path = Path("config.toml")
vfyd_tags: list[str] = field(default_factory=lambda: [
"vfyd accounting",
"vfyd discuss",
"vfyd easy",
"vfyd editor",
"vfyd js",
"vfyd marketing",
"vfyd pos",
"vfyd sale",
"vfyd security",
"vfyd spreadsheet",
"vfyd stock",
"vfyd studio",
"vfyd vidange",
"vfyd website",
"vfyd industry",
"vfyd voip",
])
_config: dict = field(init=False, repr=False)
def __post_init__(self):
load_dotenv()
self._config = self._load_config()
self.session_id = self._get_session_id()
self.team = self._validate_team()
def _load_config(self) -> dict:
"""Load the configuration file."""
if not self.config_path.exists():
raise FileNotFoundError(f"Configuration file '{self.config_path}' not found.")
try:
with self.config_path.open("rb") as f:
return tomllib.load(f)
except tomllib.TOMLDecodeError as e:
raise ValueError(f"Error decoding TOML file '{self.config_path}': {e}")
def _get_session_id(self) -> str:
"""Prioritize configuration sources for session ID."""
session_id = self._get("odoo", "session_id") or os.getenv("ODOO_SESSION_ID")
if not session_id:
raise ValueError("No valid session ID found in config or environment")
return session_id
def _validate_team(self) -> list[str]:
"""Validate and sanitize team members."""
team = self._get("team", "members")
if not team:
raise ValueError("No team members defined in configuration.")
return list({member.lower().strip() for member in team})
def _get(self, section: str, key: str, default=None):
"""Retrieve a configuration value."""
return self._config.get(section, {}).get(key, default)
config = Config()
+2 -1
View File
@@ -1,7 +1,8 @@
from dataclasses import dataclass
from model import OdooModel
from datetime import datetime
from model import OdooModel
@dataclass
class Leave(OdooModel):
-39
View File
@@ -1,39 +0,0 @@
import odoo_client
from datetime import datetime
from leave import Leave
import re
class LeaveService:
def __init__(self, client: odoo_client.OdooClient):
self.client = client
def fetch_available_employees(self, team: list[str]) -> list:
today = datetime.now().strftime("%Y-%m-%d")
domain = [
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = self.client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
if is_morning:
team_availability[employee] = (False, team_availability[employee][1])
if is_afternoon:
team_availability[employee] = (team_availability[employee][0], False)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
return available_employees
+134 -53
View File
@@ -1,19 +1,21 @@
import click
import random
import odoo_client
from rich import print, console
import re
from collections import defaultdict
from config import vfyd_tags, team, TeamMember
from leave_service import LeaveService
from task_repository import TaskRepository
from storage import JsonDispatchStorage, DispatchStorage
from datetime import datetime
from itertools import cycle
import click
import odoo_client
from config import config
from leave import Leave
from rich import console, print
from storage import DispatchStorage
from tasks import Task
console = console.Console()
storage: DispatchStorage = JsonDispatchStorage()
client = odoo_client.OdooClient()
leave_service = LeaveService(client)
task_repository = TaskRepository(client, vfyd_tags)
team = config.team
vfyd_tags = config.vfyd_tags
@click.group()
@@ -25,77 +27,156 @@ def cli() -> None:
def dispatch():
"""Randomly distribute unassigned tasks among team members."""
if storage.load():
print("[red]Tickets were already dispatched.[/red]")
return
client = odoo_client.OdooClient()
today = datetime.now().strftime("%Y-%m-%d")
available_employees = leave_service.fetch_available_employees(team)
domain = [
("start_datetime", "<=", today),
("stop_datetime", ">=", today),
]
domain.extend(["|"] * (len(team) - 1))
domain.extend(
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
)
leaves = client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
is_morning = leave.start_datetime.hour < 11
is_afternoon = leave.stop_datetime.hour >= 14
morning, afternoon = team_availability.get(employee, (True, True))
team_availability[employee] = (
morning and not is_morning,
afternoon and not is_afternoon,
)
available_employees = [
employee
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
if not available_employees:
print("[red]No available employees for dispatch.[/red]")
return
previous_dispatch = storage.load_week_to_date()
previous_dispatch = DispatchStorage.load_week_to_date()
previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks
]
# Find unsorted tasks that have not been dispatched on previous days,
# Assign them to team members and save the result
new_unsorted_tasks = task_repository.get_unsorted_excluding(previous_dispatch_ids)
random.shuffle(new_unsorted_tasks)
new_dispatch: dict[TeamMember, list[int]] = defaultdict(list)
out: dict[TeamMember, list[Task]] = defaultdict(list)
for idx, task in enumerate(new_unsorted_tasks):
member = available_employees[idx % len(available_employees)]
new_dispatch[member].append(task.id)
out[member].append(task)
storage.save(new_dispatch)
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
("id", "not in", previous_dispatch_ids),
]
records = client.web_search_read(Task, domain)
# Find unsorted tasks that have been dispatched on previous days.
# Add them back to the previously assign team members
old_unsorted_tasks = task_repository.get_unsorted_within(previous_dispatch_ids)
combined_tasks_to_sort: dict[TeamMember, list[Task]] = defaultdict(list)
for member, task_ids in previous_dispatch.items():
for task_id in task_ids:
task = next((t for t in old_unsorted_tasks if t.id == task_id), None)
if task:
combined_tasks_to_sort[member].append(task)
random.shuffle(records)
random.shuffle(available_employees)
employee_cycle = cycle(available_employees)
# Merge the new unsorted tasks with the old ones
for member, tasks in out.items():
combined_tasks_to_sort[member].extend(tasks)
new_dispatch = defaultdict(list)
for key, tasks in combined_tasks_to_sort.items():
print(f"**{key}:**")
for task in records:
employee = next(employee_cycle)
new_dispatch[employee].append(task)
merged_dispatch = defaultdict(list)
if previous_dispatch_ids:
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
("id", "in", previous_dispatch_ids),
]
old_tasks = client.web_search_read(Task, domain)
old_tasks_by_id = {task.id: task for task in old_tasks}
else:
old_tasks_by_id = {}
for employee, task_ids in previous_dispatch.items():
merged_dispatch[employee].extend(
old_tasks_by_id[tid] for tid in task_ids if tid in old_tasks_by_id
)
for employee, tasks in new_dispatch.items():
merged_dispatch[employee].extend(tasks)
dispatch_to_save = {
employee: [task.id for task in tasks]
for employee, tasks in merged_dispatch.items()
}
DispatchStorage.save(dispatch_to_save)
for employee, tasks in sorted(merged_dispatch.items()):
if not tasks:
continue
print(f"**{employee}:**")
for task in tasks:
print(task.url)
print("\n")
@cli.command()
def status():
"""Show all unassigned tasks in the backlog."""
tasks = task_repository.get_unassigned_tasks()
for task in tasks:
print(task)
client = odoo_client.OdooClient()
domain = [
("stage_id", "=", 194),
("user_ids", "=", False),
("project_id", "=", 49),
("tag_ids", "not in", vfyd_tags),
]
tasks = client.web_search_read(Task, domain)
if not tasks:
print("[yellow]No unassigned tasks in the backlog.[/yellow]")
else:
for task in tasks:
print(task)
@cli.command()
def check():
"""Check which previously dispatched tasks are still not completed."""
dispatch = storage.load()
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
tasks = task_repository.get_tasks_with_ids(reverse_dispatch.keys())
dispatch = DispatchStorage.load_week_to_date()
task_id_to_employee = {
task_id: employee
for employee, task_ids in dispatch.items()
for task_id in task_ids
}
client = odoo_client.OdooClient()
domain = [
("id", "in", list(task_id_to_employee.keys())),
("tag_ids", "not in", vfyd_tags),
("stage_id", "=", 194),
("project_id", "=", 49),
]
tasks = client.web_search_read(Task, domain)
if not tasks:
print("[green]All dispatched tasks have been completed![/green]")
return
not_done = defaultdict(list)
for task in tasks:
not_done[reverse_dispatch[task.id]].append(task)
not_done[task_id_to_employee[task.id]].append(task)
for key, value in not_done.items():
if value:
console.rule(f"[red]{key}[/red]")
for task in value:
print(task)
for employee, pending_tasks in not_done.items():
console.rule(f"[red]{employee}[/red]")
for task in pending_tasks:
print(task)
if __name__ == "__main__":
+2 -2
View File
@@ -1,6 +1,6 @@
from typing import Self, Any, get_origin
from dataclasses import fields, Field
from dataclasses import Field, fields
from datetime import datetime
from typing import Any, Self, get_origin
class OdooModel:
+9 -12
View File
@@ -1,9 +1,9 @@
import os
import requests
import random
from typing import Any, TypeVar
from model import OdooModel
import requests
from config import config
from model import OdooModel
T = TypeVar("T", bound=OdooModel)
@@ -12,10 +12,8 @@ class OdooClient:
def __init__(self):
self.base_url = "https://www.odoo.com"
self.session = requests.Session()
if session_id := os.getenv("ODOO_SESSION_ID"):
self.session.cookies.set("session_id", session_id)
else:
raise Exception("ODOO_SESSION_ID is not set")
self.session.cookies.set("session_id", config.session_id)
self.session.cookies.set("cids", "1")
self.session.cookies.set("frontend_lang", "en_US")
@@ -49,12 +47,11 @@ class OdooClient:
**kwargs: Any,
) -> list[T]:
args = [domain, model.specification(), *args]
res = self.rpc(model._name, "web_search_read", *args, **kwargs)
res.raise_for_status()
json = res.json()
response = self.rpc(model._name, "web_search_read", *args, **kwargs)
json_data = response.json()
if error := json.get("error"):
if error := json_data.get("error"):
raise RuntimeError(f"Odoo RPC Error: {error}")
records = json.get("result", {}).get("records", [])
records = json_data.get("result", {}).get("records", [])
return [model.from_record(record) for record in records]
+28 -44
View File
@@ -1,57 +1,41 @@
from typing import Optional
import json
from datetime import datetime, timedelta
from pathlib import Path
import json
from config import TeamMember
from typing import Optional
class DispatchStorage:
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: ...
def save(
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
) -> None: ...
def load_week_to_date(self) -> dict[TeamMember, list[int]]:
"""
Loads and combines dispatch data from the start of the current week up to today.
"""
today = datetime.now()
days_since_monday = today.weekday()
current_date = today - timedelta(days=days_since_monday)
combined_dispatch: dict[str, list[int]] = {}
while current_date.date() < today.date():
date_str = current_date.strftime("%Y-%m-%d")
daily_dispatch = self.load(date_str)
for key, values in daily_dispatch.items():
if key in combined_dispatch:
combined_dispatch[key].extend(values)
else:
combined_dispatch[key] = values.copy()
current_date += timedelta(days=1)
return combined_dispatch
class JsonDispatchStorage(DispatchStorage):
@staticmethod
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return Path("out", f"{date}_dispatch.json")
return Path(f"out/{date}_dispatch.json")
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]:
try:
with open(self.path(date), "r") as f:
return json.load(f)
except FileNotFoundError:
@staticmethod
def load(date: Optional[str] = None) -> dict[str, list[int]]:
path = DispatchStorage.path(date)
if not path.exists():
return {}
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save(
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
) -> None:
path = self.path(date)
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
@staticmethod
def save(dispatch: dict[str, list[int]], date: Optional[str] = None) -> None:
path = DispatchStorage.path(date)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(dispatch, f)
@staticmethod
def load_week_to_date() -> dict[str, list[int]]:
today = datetime.now()
start_of_week = today - timedelta(days=today.weekday()) # Previous Monday
combined_dispatch: dict[str, list[int]] = {}
for i in range((today - start_of_week).days + 1):
date_str = (start_of_week + timedelta(days=i)).strftime("%Y-%m-%d")
daily_dispatch = DispatchStorage.load(date_str)
for key, values in daily_dispatch.items():
combined_dispatch.setdefault(key, []).extend(values)
return combined_dispatch
-64
View File
@@ -1,64 +0,0 @@
from typing import Iterable
from tasks import Task
STAGE_BACKLOG = 194
PROJECT_ID = 49
class TaskRepository:
def __init__(self, client, vfyd_tags: Iterable[int]):
self.client = client
self._base_domain = [
("stage_id", "=", STAGE_BACKLOG),
("project_id", "=", PROJECT_ID),
("tag_ids", "not in", list(vfyd_tags)),
]
def _search(self, domain: list) -> list[Task]:
return self.client.web_search_read(Task, domain)
def get_unassigned_tasks(self) -> list[Task]:
return self._search(
[
*self._base_domain,
("user_ids", "=", False),
]
)
def get_tasks_with_ids(self, ids: Iterable[int]) -> list[Task]:
ids = list(ids)
if not ids:
return []
domain = [
*self._base_domain,
("id", "in", ids),
]
return self._search(domain)
def get_unsorted_excluding(self, exclude_ids: Iterable[int]) -> list[Task]:
exclude_ids = list(exclude_ids)
domain = [
*self._base_domain,
("user_ids", "=", False),
]
if exclude_ids:
domain.append(("id", "not in", exclude_ids))
return self._search(domain)
def get_unsorted_within(self, ids: Iterable[int]) -> list[Task]:
included_ids = list(ids)
if not included_ids:
return []
domain = [
*self._base_domain,
("user_ids", "=", False),
("id", "in", included_ids),
]
return self._search(domain)
+1
View File
@@ -1,4 +1,5 @@
from dataclasses import dataclass, field
from model import OdooModel
Generated
+55
View File
@@ -76,6 +76,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442 },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050 },
]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
@@ -97,6 +106,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979 },
]
[[package]]
name = "packaging"
version = "24.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d0/63/68dbb6eb2de9cb10ee4c9c14a0148804425e13c4fb20d61cce69f53106da/packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f", size = 163950 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/88/ef/eb23f262cca3c0c4eb7ab1933c3b1f03d021f2c48f54763065b6f0e321be/packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759", size = 65451 },
]
[[package]]
name = "pluggy"
version = "1.5.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/96/2d/02d4312c973c6050a18b314a5ad0b3210edb65a906f868e31c111dede4a6/pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1", size = 67955 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/88/5f/e351af9a41f866ac3f1fac4ca0613908d9a41741cfcf2228f4ad853b697d/pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669", size = 20556 },
]
[[package]]
name = "pygments"
version = "2.19.1"
@@ -106,6 +133,30 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/0b/9fcc47d19c48b59121088dd6da2488a49d5f72dacf8262e2790a1d2c7d15/pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c", size = 1225293 },
]
[[package]]
name = "pytest"
version = "8.3.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ae/3c/c9d525a414d506893f0cd8a8d0de7706446213181570cdbd766691164e40/pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845", size = 1450891 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634 },
]
[[package]]
name = "python-dotenv"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/88/2c/7bb1416c5620485aa793f2de31d3df393d3686aa8a8506d11e10e13c5baf/python_dotenv-1.1.0.tar.gz", hash = "sha256:41f90bc6f5f177fb41f53e87666db362025010eb28f60a01c9143bfa33a2b2d5", size = 39920 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256 },
]
[[package]]
name = "requests"
version = "2.32.3"
@@ -140,6 +191,8 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "click" },
{ name = "pytest" },
{ name = "python-dotenv" },
{ name = "requests" },
{ name = "rich" },
]
@@ -147,6 +200,8 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.8" },
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "python-dotenv", specifier = ">=1.1.0" },
{ name = "requests", specifier = ">=2.32.3" },
{ name = "rich", specifier = ">=13.9.4" },
]