/
2024-06-28 Autonomy airside worker architecture

2024-06-28 Autonomy airside worker architecture

Overview

Agenda:

  • Explanation of current system

  • Task

  • Ideas for new system

Current

Worker manager is syntactic sugar for identical workers so there is less duplicate code for creation, pausing, resuming, and joining.

Task

When a worker dies, create a new worker and delete the old one. The new worker is a drop in replacement for the old worker.

https://app.asana.com/0/75198147526823/1207217241461238

https://github.com/UWARG/computer-vision-python/pull/178

The proposed implementation is to add the following new methods to WorkerManager :

  • is_alive() which returns true if all workers are still alive, otherwise false

  • terminate_workers() which terminates all workers held by the worker manager

main() is responsible for calling the appropriate methods at the correct time and order. It is also responsible for queue management (e.g. filling and draining the queues for proper exit).

New

Move all worker responsibility to the worker manager.

https://app.asana.com/0/75198147526823/1207690733564435

Once this task is complete, then continue with the worker recovery task.

Worker manager:

def __init__(self, ...) -> None: """ Constructor. """ self.__workers = workers self.__target = target self.__class_args = args self.__input_queues = input_queues self.__output_queues = output_queues self.__worker_controller = worker_controller @staticmethod def create_worker_arguments(class_args, input_queues, output_queues, worker_controller) -> tuple: return class_args + tuple(input_queues) + tuple(output_queues) + tuple(worker_controller) # Potentially put all of these parameters into its own class @staticmethod def create_worker(target, class_args, input_queues, output_queues, worker_controller) -> tuple[bool, mp.Process | None]: args = WorkerManager.create_worker_arguments(class_args, input_queues, output_queues, worker_controller) try: worker = mp.Process(target=target, args=args) except: return False, None return True, worker def create(self, count: int, target: "(...) -> object", class_args: tuple, input_queues: list, output_queues: list, worker_controller: WorkerController) -> "list[mp.Process]": # type: ignore """ Create identical workers. count: Number of workers. target: Function. args: Arguments to function. """ if count == 0: return False, None workers = [] for _ in range(0, count): result, worker = create_worker(...) if not result: return False, None workers.append(worker) return True, WorkerManager(workers, target, class_args, input_queues, output_queues, worker_controller) def check_and_restart_dead_workers(self) -> None: for worker in self.__workers: if not worker.is_alive(): # Do the needful

Caller:

worker_managers = [] result, manager = worker_manager.WorkerManager.create(...) if not result: print("BLEARGH") return -1 worker_managers.append(manager) result, manager = worker_manager.WorkerManager.create(...) if not result: print("BLEARGH") return -1 worker_managers.append(manager) result, manager = worker_manager.WorkerManager.create(...) if not result: print("BLEARGH") return -1 worker_managers.append(manager) while True: for manager in worker_managers: manager.check_and_restart_dead_workers()