Table of Contents | ||
---|---|---|
|
Overview
Agenda:
Explanation of current system
Task
Ideas for new system
...
Current
Worker manager is syntactic sugar for identical workers so there is less duplicate code for creation, pausing, resuming, and joining.
Task
When a worker dies, create a new worker and delete the old one. The new worker is a drop in replacement for the old worker.
https://app.asana.com/0/75198147526823/1207217241461238
https://github.com/UWARG/computer-vision-python/pull/178
The proposed implementation is to add the following new methods to WorkerManager
:
is_alive()
which returns true if all workers are still alive, otherwise falseterminate_workers()
which terminates all workers held by the worker manager
main()
is responsible for calling the appropriate methods at the correct time and order. It is also responsible for queue management (e.g. filling and draining the queues for proper exit).
New
Move all worker responsibility to the worker manager.
https://app.asana.com/0/75198147526823/1207690733564435
Once this task is complete, then continue with the worker recovery task.
Worker manager:
Code Block | ||
---|---|---|
| ||
def __init__(self, ...) -> None:
"""
Constructor.
"""
self.__workers = workers
self.__target = target
self.__class_args = args
self.__input_queues = input_queues
self.__output_queues = output_queues
self.__worker_controller = worker_controller
@staticmethod
def create_worker_arguments(class_args, input_queues, output_queues, worker_controller) -> tuple:
return class_args + tuple(input_queues) + tuple(output_queues) + tuple(worker_controller)
# Potentially put all of these parameters into its own class
@staticmethod
def create_worker(target, class_args, input_queues, output_queues, worker_controller) -> tuple[bool, mp.Process | None]:
args = WorkerManager.create_worker_arguments(class_args, input_queues, output_queues, worker_controller)
try:
worker = mp.Process(target=target, args=args)
except:
return False, None
return True, worker
def create(self, count: int, target: "(...) -> object", class_args: tuple, input_queues: list, output_queues: list, worker_controller: WorkerController) -> "list[mp.Process]": # type: ignore
"""
Create identical workers.
count: Number of workers.
target: Function.
args: Arguments to function.
"""
if count == 0:
return False, None
workers = []
for _ in range(0, count):
result, worker = create_worker(...)
if not result:
return False, None
workers.append(worker)
return True, WorkerManager(workers, target, class_args, input_queues, output_queues, worker_controller)
def check_and_restart_dead_workers(self) -> None:
for worker in self.__workers:
if not worker.is_alive():
# Do the needful |
Caller:
Code Block | ||
---|---|---|
| ||
worker_managers = []
result, manager = worker_manager.WorkerManager.create(...)
if not result:
print("BLEARGH")
return -1
worker_managers.append(manager)
result, manager = worker_manager.WorkerManager.create(...)
if not result:
print("BLEARGH")
return -1
worker_managers.append(manager)
result, manager = worker_manager.WorkerManager.create(...)
if not result:
print("BLEARGH")
return -1
worker_managers.append(manager)
while True:
for manager in worker_managers:
manager.check_and_restart_dead_workers() |