Compare commits

...

4 Commits

Author SHA1 Message Date
Hubert Van De Walle
eebeed58a5 des trucs
Co-authored-by: sesn-odoo <sesn@odoo.com>
2025-07-22 11:47:33 +02:00
Hubert Van De Walle
a49c5f012f Return an empty dict when loading non existant file 2025-07-22 11:23:07 +02:00
Hubert Van De Walle
7e7ffd6f49 Update tags and team 2025-07-22 11:20:50 +02:00
Hubert Van De Walle
e7fa624b42 Ignore IDE files 2025-07-22 11:20:29 +02:00
7 changed files with 51 additions and 36 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
__pycache__/
out/
.venv
.idea/
.vscode/

View File

@ -3,7 +3,6 @@ import random
vfyd_tags = [
"vfyd accounting",
"vfyd discuss",
"vfyd easy",
"vfyd editor",
"vfyd js",
"vfyd marketing",
@ -17,6 +16,8 @@ vfyd_tags = [
"vfyd website",
"vfyd industry",
"vfyd voip",
"vfyd ai",
"vfyd iot",
]
team = [
@ -27,6 +28,7 @@ team = [
"sbel",
"wasa",
"huvw",
"artn",
]
random.shuffle(team)

View File

@ -34,9 +34,7 @@ def dispatch():
)
leaves = client.web_search_read(Leave, domain)
team_availability = dict()
for employee in team:
team_availability[employee] = (True, True)
team_availability = {employee: (True, True) for employee in team}
for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
@ -49,10 +47,14 @@ def dispatch():
available_employees = [
employee
for employee, availability in team_availability.items()
if availability[0] or availability[1]
for employee, (morning, afternoon) in team_availability.items()
if morning or afternoon
]
if not available_employees:
print("[red]No available employees for dispatch.[/red]")
return
previous_dispatch = DispatchStorage.load_week_to_date()
previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks

View File

@ -1,12 +1,12 @@
from typing import Dict, Self, Any, get_origin
from typing import Self, Any, get_origin
from dataclasses import fields, Field
from datetime import datetime
class OdooModel:
@classmethod
def specification(cls) -> Dict[str, Any]:
def field_spec(f: Field) -> Dict[str, Any]:
def specification(cls) -> dict[str, Any]:
def field_spec(f: Field) -> dict[str, Any]:
field_type = f.type
if f.name.endswith("_id") or f.name.endswith("_ids"):
return {
@ -29,7 +29,7 @@ class OdooModel:
return spec
@classmethod
def from_record(cls, record: Dict) -> Self:
def from_record(cls, record: dict) -> Self:
init_args = {}
for f in fields(cls):
if f.init and f.name in record:

View File

@ -1,7 +1,7 @@
import os
import requests
import random
from typing import Any, Dict, List, TypeVar, Type
from typing import Any, TypeVar
from model import OdooModel
@ -22,10 +22,10 @@ class OdooClient:
def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response:
body: Dict[str, Any] = {
body: dict[str, Any] = {
"jsonrpc": "2.0",
"method": method,
"id": random.randint(1, 1000000),
"id": random.randint(1, 1_000_000),
"params": {
"model": model,
"method": method,
@ -34,23 +34,27 @@ class OdooClient:
},
}
return self.session.post(
response = self.session.post(
f"{self.base_url}/web/dataset/call_kw",
json=body,
)
response.raise_for_status()
return response
def web_search_read(
self,
model: Type[T],
domain: List[tuple],
model: type[T],
domain: list[tuple[Any, ...]],
*args: Any,
**kwargs: Any,
) -> List[T]:
) -> list[T]:
args = [domain, model.specification(), *args]
res = self.rpc(model._name, "web_search_read", *args, **kwargs)
res.raise_for_status()
json = res.json()
if json.get("error"):
raise Exception(json.get("error"))
if error := json.get("error"):
raise RuntimeError(f"Odoo RPC Error: {error}")
records = json.get("result", {}).get("records", [])
return [model.from_record(record) for record in records]

View File

@ -1,27 +1,33 @@
from typing import Dict, List, Optional
from typing import Optional
from datetime import datetime, timedelta
from pathlib import Path
import json
class DispatchStorage:
@classmethod
def path(cls, date: Optional[str] = None) -> str:
@staticmethod
def path(date: Optional[str] = None) -> Path:
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
return f"out/{date}_dispatch.json"
return Path("out", f"{date}_dispatch.json")
@classmethod
def load(cls, date: Optional[str] = None) -> Dict[str, List[int]]:
with open(cls.path(date), "r") as f:
return json.load(f)
@staticmethod
def load(date: Optional[str] = None) -> dict[str, list[int]]:
try:
with open(DispatchStorage.path(date), "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
@classmethod
def save(cls, dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
with open(cls.path(date), "w") as f:
@staticmethod
def save(dispatch: dict[str, list[int]], date: Optional[str] = None) -> None:
path = DispatchStorage.path(date)
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
json.dump(dispatch, f)
@classmethod
def load_week_to_date(cls) -> Dict[str, List[int]]:
@staticmethod
def load_week_to_date() -> dict[str, list[int]]:
"""
Loads and combines dispatch data from the start of the current week up to today.
"""
@ -29,10 +35,10 @@ class DispatchStorage:
days_since_monday = today.weekday()
current_date = today - timedelta(days=days_since_monday)
combined_dispatch: Dict[str, List[int]] = {}
combined_dispatch: dict[str, list[int]] = {}
while current_date.date() < today.date():
date_str = current_date.strftime("%Y-%m-%d")
daily_dispatch = cls.load(date_str)
daily_dispatch = DispatchStorage.load(date_str)
for key, values in daily_dispatch.items():
if key in combined_dispatch:
combined_dispatch[key].extend(values)

View File

@ -1,5 +1,4 @@
from dataclasses import dataclass, field
from typing import List
from model import OdooModel
@ -9,7 +8,7 @@ class Task(OdooModel):
id: int
name: str
tag_ids: List[str]
tag_ids: list[str]
stage_id: str
priority: bool
url: str = field(init=False)