Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Table of Contents
stylenone

Overview

Agenda:

  • Explanation of current system

  • Task

  • Ideas for new system

...

Current

Worker manager is syntactic sugar for identical workers so there is less duplicate code for creation, pausing, resuming, and joining.

Task

When a worker dies, create a new worker and delete the old one. The new worker is a drop in replacement for the old worker.

https://app.asana.com/0/75198147526823/1207217241461238

https://github.com/UWARG/computer-vision-python/pull/178

The proposed implementation is to add the following new methods to WorkerManager :

  • is_alive() which returns true if all workers are still alive, otherwise false

  • terminate_workers() which terminates all workers held by the worker manager

main() is responsible for calling the appropriate methods at the correct time and order. It is also responsible for queue management (e.g. filling and draining the queues for proper exit).

New

Move all worker responsibility to the worker manager.

https://app.asana.com/0/75198147526823/1207690733564435

Once this task is complete, then continue with the worker recovery task.

Worker manager:

Code Block
languagepy
    def __init__(self, ...) -> None:
        """
        Constructor.
        """
        self.__workers = workers
        self.__target = target
        self.__class_args = args
        self.__input_queues = input_queues
        self.__output_queues = output_queues
        self.__worker_controller = worker_controller

    @staticmethod
    def create_worker_arguments(class_args, input_queues, output_queues, worker_controller) -> tuple:
        return class_args + tuple(input_queues) + tuple(output_queues) + tuple(worker_controller)

    # Potentially put all of these parameters into its own class
    @staticmethod
    def create_worker(target, class_args, input_queues, output_queues, worker_controller) -> tuple[bool, mp.Process | None]:
        args = WorkerManager.create_worker_arguments(class_args, input_queues, output_queues, worker_controller)
        try:
            worker = mp.Process(target=target, args=args)
        except:
            return False, None

        return True, worker

    def create(self, count: int, target: "(...) -> object", class_args: tuple, input_queues: list, output_queues: list, worker_controller: WorkerController) -> "list[mp.Process]":  # type: ignore
        """
        Create identical workers.

        count: Number of workers.
        target: Function.
        args: Arguments to function.
        """
        if count == 0:
            return False, None

        workers = []
        for _ in range(0, count):
            result, worker = create_worker(...)
            if not result:
                return False, None

            workers.append(worker)

        return True, WorkerManager(workers, target, class_args, input_queues, output_queues, worker_controller)


    def check_and_restart_dead_workers(self) -> None:
        for worker in self.__workers:
            if not worker.is_alive():
                # Do the needful

Caller:

Code Block
languagepy
    worker_managers = []

    result, manager = worker_manager.WorkerManager.create(...)
    if not result:
        print("BLEARGH")
        return -1

    worker_managers.append(manager)

    result, manager = worker_manager.WorkerManager.create(...)
    if not result:
        print("BLEARGH")
        return -1

    worker_managers.append(manager)

    result, manager = worker_manager.WorkerManager.create(...)
    if not result:
        print("BLEARGH")
        return -1

    worker_managers.append(manager)

    while True:
        for manager in worker_managers:
            manager.check_and_restart_dead_workers()