Compare commits
14 Commits
4b0622451b
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 9484dc5a4e | |||
| 7298a4abfa | |||
| 26c0f1da8e | |||
| 1d46d78c35 | |||
| 920c746444 | |||
| f4df1935a9 | |||
| eebeed58a5 | |||
| a49c5f012f | |||
| 7e7ffd6f49 | |||
| e7fa624b42 | |||
| 2bcb5eef20 | |||
| 54ea243c8c | |||
| d3abdb10c4 | |||
| 47a3eed98b |
@@ -1,3 +1,5 @@
|
||||
__pycache__/
|
||||
out/
|
||||
.venv
|
||||
.idea/
|
||||
.vscode/
|
||||
|
||||
@@ -1,109 +0,0 @@
|
||||
import click
|
||||
import random
|
||||
import odoo_client
|
||||
import json
|
||||
from tasks import fetch_tasks
|
||||
from rich import print, console
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
from typing import Dict, List, Optional
|
||||
from config import vfyd_tags, team
|
||||
|
||||
|
||||
console = console.Console()
|
||||
|
||||
|
||||
def load_dispatch(date: Optional[str] = None) -> Dict[str, List[int]]:
|
||||
if date is None:
|
||||
date = datetime.now().strftime("%Y-%m-%d")
|
||||
with open(f"out/{date}_dispatch.json", "r") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def save_dispatch(dispatch: Dict[str, List[int]], date: Optional[str] = None) -> None:
|
||||
if date is None:
|
||||
date = datetime.now().strftime("%Y-%m-%d")
|
||||
with open(f"out/{date}_dispatch.json", "w") as f:
|
||||
json.dump(dispatch, f)
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli() -> None:
|
||||
pass
|
||||
|
||||
|
||||
@cli.command()
|
||||
def dispatch():
|
||||
"""Randomly distribute unassigned tasks among team members."""
|
||||
|
||||
domain = [
|
||||
("stage_id", "=", 194),
|
||||
("user_ids", "=", False),
|
||||
("project_id", "=", 49),
|
||||
("tag_ids", "not in", vfyd_tags),
|
||||
]
|
||||
|
||||
client = odoo_client.OdooClient()
|
||||
records = fetch_tasks(client, domain)
|
||||
|
||||
random.shuffle(records)
|
||||
dispatch = defaultdict(list)
|
||||
out = defaultdict(list)
|
||||
for idx, task in enumerate(records):
|
||||
dispatch[team[idx % len(team)]].append(task.id)
|
||||
out[team[idx % len(team)]].append(task)
|
||||
|
||||
save_dispatch(dispatch)
|
||||
for key, tasks in out.items():
|
||||
print(f"**{key}:**")
|
||||
for task in tasks:
|
||||
print(task.url)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def status():
|
||||
"""Show all unassigned tasks in the backlog."""
|
||||
client = odoo_client.OdooClient()
|
||||
|
||||
domain = [
|
||||
("stage_id", "=", 194),
|
||||
("user_ids", "=", False),
|
||||
("project_id", "=", 49),
|
||||
("tag_ids", "not in", vfyd_tags),
|
||||
]
|
||||
|
||||
tasks = fetch_tasks(client, domain)
|
||||
for task in tasks:
|
||||
print(task)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def check():
|
||||
"""Check which previously dispatched tasks are still not completed."""
|
||||
dispatch = load_dispatch()
|
||||
|
||||
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
|
||||
|
||||
client = odoo_client.OdooClient()
|
||||
|
||||
domain = [
|
||||
("id", "in", list(reverse_dispatch.keys())),
|
||||
("tag_ids", "not in", vfyd_tags),
|
||||
("stage_id", "=", 194),
|
||||
("project_id", "=", 49),
|
||||
]
|
||||
|
||||
tasks = fetch_tasks(client, domain)
|
||||
not_done = defaultdict(list)
|
||||
for task in tasks:
|
||||
not_done[reverse_dispatch[task.id]].append(task)
|
||||
|
||||
for key, value in not_done.items():
|
||||
if value:
|
||||
console.rule(f"[red]{key}[/red]")
|
||||
for task in value:
|
||||
print(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
@@ -1,9 +1,9 @@
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
vfyd_tags = [
|
||||
"vfyd accounting",
|
||||
"vfyd discuss",
|
||||
"vfyd easy",
|
||||
"vfyd editor",
|
||||
"vfyd js",
|
||||
"vfyd marketing",
|
||||
@@ -16,16 +16,29 @@ vfyd_tags = [
|
||||
"vfyd vidange",
|
||||
"vfyd website",
|
||||
"vfyd industry",
|
||||
"vfyd voip",
|
||||
"vfyd ai",
|
||||
"vfyd iot",
|
||||
]
|
||||
|
||||
team = [
|
||||
"nle",
|
||||
type TeamMember = str
|
||||
|
||||
team: list[TeamMember] = [
|
||||
"abz",
|
||||
"hael",
|
||||
#"roen",
|
||||
"roen",
|
||||
"sbel",
|
||||
"wasa",
|
||||
"huvw",
|
||||
"artn",
|
||||
"gvar",
|
||||
"sben",
|
||||
]
|
||||
|
||||
is_friday = datetime.now().weekday() == 4
|
||||
|
||||
if not is_friday:
|
||||
team.append("nle")
|
||||
|
||||
|
||||
random.shuffle(team)
|
||||
@@ -0,0 +1,12 @@
|
||||
from dataclasses import dataclass
|
||||
from model import OdooModel
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class Leave(OdooModel):
|
||||
_name = "hr.leave.report.calendar"
|
||||
|
||||
start_datetime: datetime
|
||||
stop_datetime: datetime
|
||||
employee_id: str
|
||||
@@ -0,0 +1,39 @@
|
||||
import odoo_client
|
||||
from datetime import datetime
|
||||
from leave import Leave
|
||||
import re
|
||||
|
||||
|
||||
class LeaveService:
|
||||
def __init__(self, client: odoo_client.OdooClient):
|
||||
self.client = client
|
||||
|
||||
def fetch_available_employees(self, team: list[str]) -> list:
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
domain = [
|
||||
("start_datetime", "<=", today),
|
||||
("stop_datetime", ">=", today),
|
||||
]
|
||||
domain.extend(["|"] * (len(team) - 1))
|
||||
domain.extend(
|
||||
[("employee_id.name", "=ilike", f"%({employee})") for employee in team]
|
||||
)
|
||||
leaves = self.client.web_search_read(Leave, domain)
|
||||
|
||||
team_availability = {employee: (True, True) for employee in team}
|
||||
|
||||
for leave in leaves:
|
||||
employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower()
|
||||
is_morning = leave.start_datetime.hour < 11
|
||||
is_afternoon = leave.stop_datetime.hour >= 14
|
||||
if is_morning:
|
||||
team_availability[employee] = (False, team_availability[employee][1])
|
||||
if is_afternoon:
|
||||
team_availability[employee] = (team_availability[employee][0], False)
|
||||
|
||||
available_employees = [
|
||||
employee
|
||||
for employee, (morning, afternoon) in team_availability.items()
|
||||
if morning or afternoon
|
||||
]
|
||||
return available_employees
|
||||
+102
@@ -0,0 +1,102 @@
|
||||
import click
|
||||
import random
|
||||
import odoo_client
|
||||
from rich import print, console
|
||||
from collections import defaultdict
|
||||
from config import vfyd_tags, team, TeamMember
|
||||
from leave_service import LeaveService
|
||||
from task_repository import TaskRepository
|
||||
from storage import JsonDispatchStorage, DispatchStorage
|
||||
from tasks import Task
|
||||
|
||||
console = console.Console()
|
||||
storage: DispatchStorage = JsonDispatchStorage()
|
||||
client = odoo_client.OdooClient()
|
||||
leave_service = LeaveService(client)
|
||||
task_repository = TaskRepository(client, vfyd_tags)
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli() -> None:
|
||||
pass
|
||||
|
||||
|
||||
@cli.command()
|
||||
def dispatch():
|
||||
"""Randomly distribute unassigned tasks among team members."""
|
||||
|
||||
if storage.load():
|
||||
print("[red]Tickets were already dispatched.[/red]")
|
||||
return
|
||||
|
||||
available_employees = leave_service.fetch_available_employees(team)
|
||||
|
||||
if not available_employees:
|
||||
print("[red]No available employees for dispatch.[/red]")
|
||||
return
|
||||
|
||||
previous_dispatch = storage.load_week_to_date()
|
||||
previous_dispatch_ids = [
|
||||
task_id for tasks in previous_dispatch.values() for task_id in tasks
|
||||
]
|
||||
|
||||
# Find unsorted tasks that have not been dispatched on previous days,
|
||||
# Assign them to team members and save the result
|
||||
new_unsorted_tasks = task_repository.get_unsorted_excluding(previous_dispatch_ids)
|
||||
random.shuffle(new_unsorted_tasks)
|
||||
new_dispatch: dict[TeamMember, list[int]] = defaultdict(list)
|
||||
out: dict[TeamMember, list[Task]] = defaultdict(list)
|
||||
for idx, task in enumerate(new_unsorted_tasks):
|
||||
member = available_employees[idx % len(available_employees)]
|
||||
new_dispatch[member].append(task.id)
|
||||
out[member].append(task)
|
||||
storage.save(new_dispatch)
|
||||
|
||||
# Find unsorted tasks that have been dispatched on previous days.
|
||||
# Add them back to the previously assign team members
|
||||
old_unsorted_tasks = task_repository.get_unsorted_within(previous_dispatch_ids)
|
||||
combined_tasks_to_sort: dict[TeamMember, list[Task]] = defaultdict(list)
|
||||
for member, task_ids in previous_dispatch.items():
|
||||
for task_id in task_ids:
|
||||
task = next((t for t in old_unsorted_tasks if t.id == task_id), None)
|
||||
if task:
|
||||
combined_tasks_to_sort[member].append(task)
|
||||
|
||||
# Merge the new unsorted tasks with the old ones
|
||||
for member, tasks in out.items():
|
||||
combined_tasks_to_sort[member].extend(tasks)
|
||||
|
||||
for key, tasks in combined_tasks_to_sort.items():
|
||||
print(f"**{key}:**")
|
||||
for task in tasks:
|
||||
print(task.url)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def status():
|
||||
"""Show all unassigned tasks in the backlog."""
|
||||
tasks = task_repository.get_unassigned_tasks()
|
||||
for task in tasks:
|
||||
print(task)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def check():
|
||||
"""Check which previously dispatched tasks are still not completed."""
|
||||
dispatch = storage.load()
|
||||
reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs}
|
||||
tasks = task_repository.get_tasks_with_ids(reverse_dispatch.keys())
|
||||
|
||||
not_done = defaultdict(list)
|
||||
for task in tasks:
|
||||
not_done[reverse_dispatch[task.id]].append(task)
|
||||
|
||||
for key, value in not_done.items():
|
||||
if value:
|
||||
console.rule(f"[red]{key}[/red]")
|
||||
for task in value:
|
||||
print(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
@@ -0,0 +1,55 @@
|
||||
from typing import Self, Any, get_origin
|
||||
from dataclasses import fields, Field
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class OdooModel:
|
||||
@classmethod
|
||||
def specification(cls) -> dict[str, Any]:
|
||||
def field_spec(f: Field) -> dict[str, Any]:
|
||||
field_type = f.type
|
||||
if f.name.endswith("_id") or f.name.endswith("_ids"):
|
||||
return {
|
||||
f.name: {
|
||||
"fields": {
|
||||
"display_name": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
elif field_type in {str, int, bool, datetime}:
|
||||
return {f.name: {}}
|
||||
else:
|
||||
raise NotImplementedError(f"Unsupported field type: {field_type}")
|
||||
|
||||
spec = dict()
|
||||
for f in fields(cls):
|
||||
if f.init:
|
||||
spec.update(field_spec(f))
|
||||
|
||||
return spec
|
||||
|
||||
@classmethod
|
||||
def from_record(cls, record: dict) -> Self:
|
||||
init_args = {}
|
||||
for f in fields(cls):
|
||||
if f.init and f.name in record:
|
||||
value = record[f.name]
|
||||
|
||||
if f.type is datetime:
|
||||
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
|
||||
|
||||
if f.name.endswith("_id"):
|
||||
if f.type is str:
|
||||
value = value["display_name"]
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
if f.name.endswith("_ids"):
|
||||
if get_origin(f.type) is list and f.type.__args__[0] is str:
|
||||
value = [v["display_name"] for v in value]
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
init_args[f.name] = value
|
||||
|
||||
return cls(**init_args)
|
||||
@@ -1,7 +1,11 @@
|
||||
import os
|
||||
import requests
|
||||
import random
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, TypeVar
|
||||
from model import OdooModel
|
||||
|
||||
|
||||
T = TypeVar("T", bound=OdooModel)
|
||||
|
||||
|
||||
class OdooClient:
|
||||
@@ -18,10 +22,10 @@ class OdooClient:
|
||||
def rpc(
|
||||
self, model: str, method: str, *args: Any, **kwargs: Any
|
||||
) -> requests.Response:
|
||||
body: Dict[str, Any] = {
|
||||
body: dict[str, Any] = {
|
||||
"jsonrpc": "2.0",
|
||||
"method": method,
|
||||
"id": random.randint(1, 1000000),
|
||||
"id": random.randint(1, 1_000_000),
|
||||
"params": {
|
||||
"model": model,
|
||||
"method": method,
|
||||
@@ -30,23 +34,27 @@ class OdooClient:
|
||||
},
|
||||
}
|
||||
|
||||
return self.session.post(
|
||||
response = self.session.post(
|
||||
f"{self.base_url}/web/dataset/call_kw",
|
||||
json=body,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
def web_search_read(
|
||||
self,
|
||||
model: str,
|
||||
domain: List[tuple],
|
||||
spec: Dict[str, Any],
|
||||
model: type[T],
|
||||
domain: list[tuple[Any, ...]],
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> List[Dict[str, Any]]:
|
||||
args = [domain, spec, *args]
|
||||
res = self.rpc(model, "web_search_read", *args, **kwargs)
|
||||
) -> list[T]:
|
||||
args = [domain, model.specification(), *args]
|
||||
res = self.rpc(model._name, "web_search_read", *args, **kwargs)
|
||||
res.raise_for_status()
|
||||
json = res.json()
|
||||
if json.get("error"):
|
||||
raise Exception(json.get("error"))
|
||||
return json.get("result", {}).get("records", [])
|
||||
|
||||
if error := json.get("error"):
|
||||
raise RuntimeError(f"Odoo RPC Error: {error}")
|
||||
|
||||
records = json.get("result", {}).get("records", [])
|
||||
return [model.from_record(record) for record in records]
|
||||
@@ -0,0 +1,57 @@
|
||||
from typing import Optional
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import json
|
||||
from config import TeamMember
|
||||
|
||||
|
||||
class DispatchStorage:
|
||||
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: ...
|
||||
|
||||
def save(
|
||||
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
|
||||
) -> None: ...
|
||||
|
||||
def load_week_to_date(self) -> dict[TeamMember, list[int]]:
|
||||
"""
|
||||
Loads and combines dispatch data from the start of the current week up to today.
|
||||
"""
|
||||
today = datetime.now()
|
||||
days_since_monday = today.weekday()
|
||||
current_date = today - timedelta(days=days_since_monday)
|
||||
|
||||
combined_dispatch: dict[str, list[int]] = {}
|
||||
while current_date.date() < today.date():
|
||||
date_str = current_date.strftime("%Y-%m-%d")
|
||||
daily_dispatch = self.load(date_str)
|
||||
for key, values in daily_dispatch.items():
|
||||
if key in combined_dispatch:
|
||||
combined_dispatch[key].extend(values)
|
||||
else:
|
||||
combined_dispatch[key] = values.copy()
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
return combined_dispatch
|
||||
|
||||
|
||||
class JsonDispatchStorage(DispatchStorage):
|
||||
@staticmethod
|
||||
def path(date: Optional[str] = None) -> Path:
|
||||
if date is None:
|
||||
date = datetime.now().strftime("%Y-%m-%d")
|
||||
return Path("out", f"{date}_dispatch.json")
|
||||
|
||||
def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]:
|
||||
try:
|
||||
with open(self.path(date), "r") as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
def save(
|
||||
self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None
|
||||
) -> None:
|
||||
path = self.path(date)
|
||||
path.parent.mkdir(exist_ok=True)
|
||||
with open(path, "w") as f:
|
||||
json.dump(dispatch, f)
|
||||
@@ -0,0 +1,64 @@
|
||||
from typing import Iterable
|
||||
|
||||
from tasks import Task
|
||||
|
||||
STAGE_BACKLOG = 194
|
||||
PROJECT_ID = 49
|
||||
|
||||
|
||||
class TaskRepository:
|
||||
def __init__(self, client, vfyd_tags: Iterable[int]):
|
||||
self.client = client
|
||||
self._base_domain = [
|
||||
("stage_id", "=", STAGE_BACKLOG),
|
||||
("project_id", "=", PROJECT_ID),
|
||||
("tag_ids", "not in", list(vfyd_tags)),
|
||||
]
|
||||
|
||||
def _search(self, domain: list) -> list[Task]:
|
||||
return self.client.web_search_read(Task, domain)
|
||||
|
||||
def get_unassigned_tasks(self) -> list[Task]:
|
||||
return self._search(
|
||||
[
|
||||
*self._base_domain,
|
||||
("user_ids", "=", False),
|
||||
]
|
||||
)
|
||||
|
||||
def get_tasks_with_ids(self, ids: Iterable[int]) -> list[Task]:
|
||||
ids = list(ids)
|
||||
if not ids:
|
||||
return []
|
||||
|
||||
domain = [
|
||||
*self._base_domain,
|
||||
("id", "in", ids),
|
||||
]
|
||||
return self._search(domain)
|
||||
|
||||
def get_unsorted_excluding(self, exclude_ids: Iterable[int]) -> list[Task]:
|
||||
exclude_ids = list(exclude_ids)
|
||||
|
||||
domain = [
|
||||
*self._base_domain,
|
||||
("user_ids", "=", False),
|
||||
]
|
||||
|
||||
if exclude_ids:
|
||||
domain.append(("id", "not in", exclude_ids))
|
||||
|
||||
return self._search(domain)
|
||||
|
||||
def get_unsorted_within(self, ids: Iterable[int]) -> list[Task]:
|
||||
included_ids = list(ids)
|
||||
if not included_ids:
|
||||
return []
|
||||
|
||||
domain = [
|
||||
*self._base_domain,
|
||||
("user_ids", "=", False),
|
||||
("id", "in", included_ids),
|
||||
]
|
||||
|
||||
return self._search(domain)
|
||||
@@ -0,0 +1,17 @@
|
||||
from dataclasses import dataclass, field
|
||||
from model import OdooModel
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task(OdooModel):
|
||||
_name = "project.task"
|
||||
|
||||
id: int
|
||||
name: str
|
||||
tag_ids: list[str]
|
||||
stage_id: str
|
||||
priority: bool
|
||||
url: str = field(init=False)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"
|
||||
@@ -1,60 +0,0 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List
|
||||
from odoo_client import OdooClient
|
||||
from rich.table import Table
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
id: int
|
||||
name: str
|
||||
tags: List[str]
|
||||
stage: str
|
||||
priority: bool
|
||||
url: str = field(init=False)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.url = f"https://www.odoo.com/odoo/49/tasks/{self.id}"
|
||||
|
||||
def __rich__(self):
|
||||
table = Table(show_header=True, header_style="bold magenta", box=None)
|
||||
table.add_column("Field", style="cyan", justify="right")
|
||||
table.add_column("Value", style="bold green")
|
||||
table.add_row("ID", str(self.id))
|
||||
table.add_row("Name", self.name)
|
||||
table.add_row("Tags", ", ".join(self.tags) if self.tags else "[dim]No tags[/dim]")
|
||||
table.add_row("Stage", f"[yellow]{self.stage}[/yellow]")
|
||||
table.add_row("Priority", "[bold red]High[/bold red]" if self.priority else "[dim]Low[/dim]")
|
||||
table.add_row("URL", f"[blue underline]{self.url}[/blue underline]")
|
||||
return table
|
||||
|
||||
|
||||
def map_record_to_task(record: Dict) -> Task:
|
||||
return Task(
|
||||
id=record["id"],
|
||||
name=record["name"],
|
||||
tags=[tag["display_name"] for tag in record["tag_ids"]],
|
||||
stage=record["stage_id"]["display_name"],
|
||||
priority=record["priority"] == "1",
|
||||
)
|
||||
|
||||
|
||||
def fetch_tasks(client: OdooClient, domain: List[tuple[str, str, str]]) -> List[Task]:
|
||||
records = client.web_search_read(
|
||||
"project.task",
|
||||
domain,
|
||||
spec={
|
||||
"name": {},
|
||||
"priority": {},
|
||||
"tag_ids": {
|
||||
"fields": {
|
||||
"display_name": {},
|
||||
},
|
||||
},
|
||||
"stage_id": {
|
||||
"fields": {
|
||||
"display_name": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
return [map_record_to_task(record) for record in records]
|
||||
Reference in New Issue
Block a user