diff --git a/src/config.py b/src/config.py index e4219fb..9f63e47 100644 --- a/src/config.py +++ b/src/config.py @@ -21,7 +21,9 @@ vfyd_tags = [ "vfyd iot", ] -team = [ +type TeamMember = str + +team: list[TeamMember] = [ "abz", "hael", "roen", diff --git a/src/leave_service.py b/src/leave_service.py new file mode 100644 index 0000000..4253f79 --- /dev/null +++ b/src/leave_service.py @@ -0,0 +1,39 @@ +import odoo_client +from datetime import datetime +from leave import Leave +import re + + +class LeaveService: + def __init__(self, client: odoo_client.OdooClient): + self.client = client + + def fetch_available_employees(self, team: list[str]) -> list: + today = datetime.now().strftime("%Y-%m-%d") + domain = [ + ("start_datetime", "<=", today), + ("stop_datetime", ">=", today), + ] + domain.extend(["|"] * (len(team) - 1)) + domain.extend( + [("employee_id.name", "=ilike", f"%({employee})") for employee in team] + ) + leaves = self.client.web_search_read(Leave, domain) + + team_availability = {employee: (True, True) for employee in team} + + for leave in leaves: + employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower() + is_morning = leave.start_datetime.hour < 11 + is_afternoon = leave.stop_datetime.hour >= 14 + if is_morning: + team_availability[employee] = (False, team_availability[employee][1]) + if is_afternoon: + team_availability[employee] = (team_availability[employee][0], False) + + available_employees = [ + employee + for employee, (morning, afternoon) in team_availability.items() + if morning or afternoon + ] + return available_employees diff --git a/src/main.py b/src/main.py index d35be31..b87dbcf 100644 --- a/src/main.py +++ b/src/main.py @@ -1,16 +1,19 @@ import click import random import odoo_client -import re from rich import print, console -from datetime import datetime from collections import defaultdict -from config import vfyd_tags, team +from config import vfyd_tags, team, TeamMember +from leave_service import LeaveService +from task_repository import TaskRepository +from storage import JsonDispatchStorage, DispatchStorage from tasks import Task -from leave import Leave -from storage import DispatchStorage console = console.Console() +storage: DispatchStorage = JsonDispatchStorage() +client = odoo_client.OdooClient() +leave_service = LeaveService(client) +task_repository = TaskRepository(client, vfyd_tags) @click.group() @@ -22,86 +25,48 @@ def cli() -> None: def dispatch(): """Randomly distribute unassigned tasks among team members.""" - if DispatchStorage.load(): + if storage.load(): print("[red]Tickets were already dispatched.[/red]") return - client = odoo_client.OdooClient() - today = datetime.now().strftime("%Y-%m-%d") - domain = [ - ("start_datetime", "<=", today), - ("stop_datetime", ">=", today), - ] - domain.extend(["|"] * (len(team) - 1)) - domain.extend( - [("employee_id.name", "=ilike", f"%({employee})") for employee in team] - ) - leaves = client.web_search_read(Leave, domain) - - team_availability = {employee: (True, True) for employee in team} - - for leave in leaves: - employee = re.search(r"\((.*?)\)", leave.employee_id).group(1).lower() - is_morning = leave.start_datetime.hour < 11 - is_afternoon = leave.stop_datetime.hour >= 14 - if is_morning: - team_availability[employee] = (False, team_availability[employee][1]) - if is_afternoon: - team_availability[employee] = (team_availability[employee][0], False) - - available_employees = [ - employee - for employee, (morning, afternoon) in team_availability.items() - if morning or afternoon - ] + available_employees = leave_service.fetch_available_employees(team) if not available_employees: print("[red]No available employees for dispatch.[/red]") return - previous_dispatch = DispatchStorage.load_week_to_date() + previous_dispatch = storage.load_week_to_date() previous_dispatch_ids = [ task_id for tasks in previous_dispatch.values() for task_id in tasks ] - domain = [ - ("stage_id", "=", 194), - ("user_ids", "=", False), - ("project_id", "=", 49), - ("tag_ids", "not in", vfyd_tags), - ("id", "not in", previous_dispatch_ids), - ] - records = client.web_search_read(Task, domain) + # Find unsorted tasks that have not been dispatched on previous days, + # Assign them to team members and save the result + new_unsorted_tasks = task_repository.get_unsorted_excluding(previous_dispatch_ids) + random.shuffle(new_unsorted_tasks) + new_dispatch: dict[TeamMember, list[int]] = defaultdict(list) + out: dict[TeamMember, list[Task]] = defaultdict(list) + for idx, task in enumerate(new_unsorted_tasks): + member = available_employees[idx % len(available_employees)] + new_dispatch[member].append(task.id) + out[member].append(task) + storage.save(new_dispatch) - random.shuffle(records) - dispatch = defaultdict(list) - out = defaultdict(list) - for idx, task in enumerate(records): - dispatch[available_employees[idx % len(available_employees)]].append(task.id) - out[available_employees[idx % len(available_employees)]].append(task) - - DispatchStorage.save(dispatch) - - domain = [ - ("stage_id", "=", 194), - ("user_ids", "=", False), - ("project_id", "=", 49), - ("tag_ids", "not in", vfyd_tags), - ("id", "in", previous_dispatch_ids), - ] - records = client.web_search_read(Task, domain) - - merged = defaultdict(list) + # Find unsorted tasks that have been dispatched on previous days. + # Add them back to the previously assign team members + old_unsorted_tasks = task_repository.get_unsorted_within(previous_dispatch_ids) + combined_tasks_to_sort: dict[TeamMember, list[Task]] = defaultdict(list) for member, task_ids in previous_dispatch.items(): for task_id in task_ids: - task = next((t for t in records if t.id == task_id), None) + task = next((t for t in old_unsorted_tasks if t.id == task_id), None) if task: - merged[member].append(task) + combined_tasks_to_sort[member].append(task) + # Merge the new unsorted tasks with the old ones for member, tasks in out.items(): - merged[member].extend(tasks) + combined_tasks_to_sort[member].extend(tasks) - for key, tasks in merged.items(): + for key, tasks in combined_tasks_to_sort.items(): print(f"**{key}:**") for task in tasks: print(task.url) @@ -110,15 +75,7 @@ def dispatch(): @cli.command() def status(): """Show all unassigned tasks in the backlog.""" - - client = odoo_client.OdooClient() - domain = [ - ("stage_id", "=", 194), - ("user_ids", "=", False), - ("project_id", "=", 49), - ("tag_ids", "not in", vfyd_tags), - ] - tasks = client.web_search_read(Task, domain) + tasks = task_repository.get_unassigned_tasks() for task in tasks: print(task) @@ -126,17 +83,9 @@ def status(): @cli.command() def check(): """Check which previously dispatched tasks are still not completed.""" - - dispatch = DispatchStorage.load() + dispatch = storage.load() reverse_dispatch = {v: k for k, vs in dispatch.items() for v in vs} - client = odoo_client.OdooClient() - domain = [ - ("id", "in", list(reverse_dispatch.keys())), - ("tag_ids", "not in", vfyd_tags), - ("stage_id", "=", 194), - ("project_id", "=", 49), - ] - tasks = client.web_search_read(Task, domain) + tasks = task_repository.get_tasks_with_ids(reverse_dispatch.keys()) not_done = defaultdict(list) for task in tasks: diff --git a/src/storage.py b/src/storage.py index 0c215a8..a0eb4ff 100644 --- a/src/storage.py +++ b/src/storage.py @@ -2,32 +2,17 @@ from typing import Optional from datetime import datetime, timedelta from pathlib import Path import json +from config import TeamMember class DispatchStorage: - @staticmethod - def path(date: Optional[str] = None) -> Path: - if date is None: - date = datetime.now().strftime("%Y-%m-%d") - return Path("out", f"{date}_dispatch.json") + def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: ... - @staticmethod - def load(date: Optional[str] = None) -> dict[str, list[int]]: - try: - with open(DispatchStorage.path(date), "r") as f: - return json.load(f) - except FileNotFoundError: - return {} + def save( + self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None + ) -> None: ... - @staticmethod - def save(dispatch: dict[str, list[int]], date: Optional[str] = None) -> None: - path = DispatchStorage.path(date) - path.parent.mkdir(exist_ok=True) - with open(path, "w") as f: - json.dump(dispatch, f) - - @staticmethod - def load_week_to_date() -> dict[str, list[int]]: + def load_week_to_date(self) -> dict[TeamMember, list[int]]: """ Loads and combines dispatch data from the start of the current week up to today. """ @@ -38,7 +23,7 @@ class DispatchStorage: combined_dispatch: dict[str, list[int]] = {} while current_date.date() < today.date(): date_str = current_date.strftime("%Y-%m-%d") - daily_dispatch = DispatchStorage.load(date_str) + daily_dispatch = self.load(date_str) for key, values in daily_dispatch.items(): if key in combined_dispatch: combined_dispatch[key].extend(values) @@ -47,3 +32,26 @@ class DispatchStorage: current_date += timedelta(days=1) return combined_dispatch + + +class JsonDispatchStorage(DispatchStorage): + @staticmethod + def path(date: Optional[str] = None) -> Path: + if date is None: + date = datetime.now().strftime("%Y-%m-%d") + return Path("out", f"{date}_dispatch.json") + + def load(self, date: Optional[str] = None) -> dict[TeamMember, list[int]]: + try: + with open(self.path(date), "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + + def save( + self, dispatch: dict[TeamMember, list[int]], date: Optional[str] = None + ) -> None: + path = self.path(date) + path.parent.mkdir(exist_ok=True) + with open(path, "w") as f: + json.dump(dispatch, f) diff --git a/src/task_repository.py b/src/task_repository.py new file mode 100644 index 0000000..1f01145 --- /dev/null +++ b/src/task_repository.py @@ -0,0 +1,64 @@ +from typing import Iterable + +from tasks import Task + +STAGE_BACKLOG = 194 +PROJECT_ID = 49 + + +class TaskRepository: + def __init__(self, client, vfyd_tags: Iterable[int]): + self.client = client + self._base_domain = [ + ("stage_id", "=", STAGE_BACKLOG), + ("project_id", "=", PROJECT_ID), + ("tag_ids", "not in", list(vfyd_tags)), + ] + + def _search(self, domain: list) -> list[Task]: + return self.client.web_search_read(Task, domain) + + def get_unassigned_tasks(self) -> list[Task]: + return self._search( + [ + *self._base_domain, + ("user_ids", "=", False), + ] + ) + + def get_tasks_with_ids(self, ids: Iterable[int]) -> list[Task]: + ids = list(ids) + if not ids: + return [] + + domain = [ + *self._base_domain, + ("id", "in", ids), + ] + return self._search(domain) + + def get_unsorted_excluding(self, exclude_ids: Iterable[int]) -> list[Task]: + exclude_ids = list(exclude_ids) + + domain = [ + *self._base_domain, + ("user_ids", "=", False), + ] + + if exclude_ids: + domain.append(("id", "not in", exclude_ids)) + + return self._search(domain) + + def get_unsorted_within(self, ids: Iterable[int]) -> list[Task]: + included_ids = list(ids) + if not included_ids: + return [] + + domain = [ + *self._base_domain, + ("user_ids", "=", False), + ("ids", "in", included_ids), + ] + + return self._search(domain)