Compare commits

..

No commits in common. "eebeed58a58de975f985f1ae9198356f0072e9d1" and "2bcb5eef20651d1f6f3ff8671b14924f4c6cd650" have entirely different histories.

7 changed files with 36 additions and 51 deletions

2
.gitignore vendored
View File

@ -1,5 +1,3 @@
__pycache__/ __pycache__/
out/ out/
.venv .venv
.idea/
.vscode/

View File

@ -3,6 +3,7 @@ import random
vfyd_tags = [ vfyd_tags = [
"vfyd accounting", "vfyd accounting",
"vfyd discuss", "vfyd discuss",
"vfyd easy",
"vfyd editor", "vfyd editor",
"vfyd js", "vfyd js",
"vfyd marketing", "vfyd marketing",
@ -16,8 +17,6 @@ vfyd_tags = [
"vfyd website", "vfyd website",
"vfyd industry", "vfyd industry",
"vfyd voip", "vfyd voip",
"vfyd ai",
"vfyd iot",
] ]
team = [ team = [
@ -28,7 +27,6 @@ team = [
"sbel", "sbel",
"wasa", "wasa",
"huvw", "huvw",
"artn",
] ]
random.shuffle(team) random.shuffle(team)

View File

@ -34,7 +34,9 @@ def dispatch():
) )
leaves = client.web_search_read(Leave, domain) leaves = client.web_search_read(Leave, domain)
team_availability = {employee: (True, True) for employee in team} team_availability = dict()
for employee in team:
team_availability[employee] = (True, True)
for leave in leaves: for leave in leaves:
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower() employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
@ -47,14 +49,10 @@ def dispatch():
available_employees = [ available_employees = [
employee employee
for employee, (morning, afternoon) in team_availability.items() for employee, availability in team_availability.items()
if morning or afternoon if availability[0] or availability[1]
] ]
if not available_employees:
print("[red]No available employees for dispatch.[/red]")
return
previous_dispatch = DispatchStorage.load_week_to_date() previous_dispatch = DispatchStorage.load_week_to_date()
previous_dispatch_ids = [ previous_dispatch_ids = [
task_id for tasks in previous_dispatch.values() for task_id in tasks task_id for tasks in previous_dispatch.values() for task_id in tasks

View File

@ -1,12 +1,12 @@
from typing import Self, Any, get_origin from typing import Dict, Self, Any, get_origin
from dataclasses import fields, Field from dataclasses import fields, Field
from datetime import datetime from datetime import datetime
class OdooModel: class OdooModel:
@classmethod @classmethod
def specification(cls) -> dict[str, Any]: def specification(cls) -> Dict[str, Any]:
def field_spec(f: Field) -> dict[str, Any]: def field_spec(f: Field) -> Dict[str, Any]:
field_type = f.type field_type = f.type
if f.name.endswith("_id") or f.name.endswith("_ids"): if f.name.endswith("_id") or f.name.endswith("_ids"):
return { return {
@ -29,7 +29,7 @@ class OdooModel:
return spec return spec
@classmethod @classmethod
def from_record(cls, record: dict) -> Self: def from_record(cls, record: Dict) -> Self:
init_args = {} init_args = {}
for f in fields(cls): for f in fields(cls):
if f.init and f.name in record: if f.init and f.name in record:

View File

@ -1,7 +1,7 @@
import os import os
import requests import requests
import random import random
from typing import Any, TypeVar from typing import Any, Dict, List, TypeVar, Type
from model import OdooModel from model import OdooModel
@ -22,10 +22,10 @@ class OdooClient:
def rpc( def rpc(
self, model: str, method: str, *args: Any, **kwargs: Any self, model: str, method: str, *args: Any, **kwargs: Any
) -> requests.Response: ) -> requests.Response:
body: dict[str, Any] = { body: Dict[str, Any] = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": method, "method": method,
"id": random.randint(1, 1_000_000), "id": random.randint(1, 1000000),
"params": { "params": {
"model": model, "model": model,
"method": method, "method": method,
@ -34,27 +34,23 @@ class OdooClient:
}, },
} }
response = self.session.post( return self.session.post(
f"{self.base_url}/web/dataset/call_kw", f"{self.base_url}/web/dataset/call_kw",
json=body, json=body,
) )
response.raise_for_status()
return response
def web_search_read( def web_search_read(
self, self,
model: type[T], model: Type[T],
domain: list[tuple[Any, ...]], domain: List[tuple],
*args: Any, *args: Any,
**kwargs: Any, **kwargs: Any,
) -> list[T]: ) -> List[T]:
args = [domain, model.specification(), *args] args = [domain, model.specification(), *args]
res = self.rpc(model._name, "web_search_read", *args, **kwargs) res = self.rpc(model._name, "web_search_read", *args, **kwargs)
res.raise_for_status() res.raise_for_status()
json = res.json() json = res.json()
if json.get("error"):
if error := json.get("error"): raise Exception(json.get("error"))
raise RuntimeError(f"Odoo RPC Error: {error}")
records = json.get("result", {}).get("records", []) records = json.get("result", {}).get("records", [])
return [model.from_record(record) for record in records] return [model.from_record(record) for record in records]

View File

@ -1,33 +1,27 @@
from typing import Optional from typing import Dict, List, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path
import json import json
class DispatchStorage: class DispatchStorage:
@staticmethod @classmethod
def path(date: Optional[str] = None) -> Path: def path(cls, date: Optional[str] = None) -> str:
if date is None: if date is None:
date = datetime.now().strftime("%Y-%m-%d") date = datetime.now().strftime("%Y-%m-%d")
return Path("out", f"{date}_dispatch.json") return f"out/{date}_dispatch.json"
@staticmethod @classmethod
def load(date: Optional[str] = None) -> dict[str, list[int]]: def load(cls, date: Optional[str] = None) -> Dict[str, List[int]]:
try: with open(cls.path(date), "r") as f:
with open(DispatchStorage.path(date), "r") as f: return json.load(f)
return json.load(f)
except FileNotFoundError:
return {}
@staticmethod @classmethod
def save(dispatch: dict[str, list[int]], date: Optional[str] = None) -> None: def save(cls, dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
path = DispatchStorage.path(date) with open(cls.path(date), "w") as f:
path.parent.mkdir(exist_ok=True)
with open(path, "w") as f:
json.dump(dispatch, f) json.dump(dispatch, f)
@staticmethod @classmethod
def load_week_to_date() -> dict[str, list[int]]: def load_week_to_date(cls) -> Dict[str, List[int]]:
""" """
Loads and combines dispatch data from the start of the current week up to today. Loads and combines dispatch data from the start of the current week up to today.
""" """
@ -35,10 +29,10 @@ class DispatchStorage:
days_since_monday = today.weekday() days_since_monday = today.weekday()
current_date = today - timedelta(days=days_since_monday) current_date = today - timedelta(days=days_since_monday)
combined_dispatch: dict[str, list[int]] = {} combined_dispatch: Dict[str, List[int]] = {}
while current_date.date() < today.date(): while current_date.date() < today.date():
date_str = current_date.strftime("%Y-%m-%d") date_str = current_date.strftime("%Y-%m-%d")
daily_dispatch = DispatchStorage.load(date_str) daily_dispatch = cls.load(date_str)
for key, values in daily_dispatch.items(): for key, values in daily_dispatch.items():
if key in combined_dispatch: if key in combined_dispatch:
combined_dispatch[key].extend(values) combined_dispatch[key].extend(values)

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List
from model import OdooModel from model import OdooModel
@ -8,7 +9,7 @@ class Task(OdooModel):
id: int id: int
name: str name: str
tag_ids: list[str] tag_ids: List[str]
stage_id: str stage_id: str
priority: bool priority: bool
url: str = field(init=False) url: str = field(init=False)