Source code for elevaso_spine.thrd.mgr

"""
.. module:: mgr
    :platform: Unix, Windows
    :synopsis: Function to manage threads
"""

# Python Standard Libraries
import logging
import queue
import time

# 3rd Party Libraries


# Project Specific Libraries


LOGGER = logging.getLogger(__name__)


[docs] def create( threads: int, thread_class: object, params: dict, thread_queue: queue.Queue = None, ) -> list: """Create threads for parallel processing Args: threads (int): Number of threads to create thread_class (object): Thread class to create params (dict): Dictionary of parameters to pass into thread thread_queue (queue.Queue, Optional): Queue to retrieve items to work, defaults to None Raises: ValueError if threads == 0 Returns: list: List of thread objects created """ if threads == 0: raise ValueError("Threads must be greater than 0") thread_list = [] class_name = thread_class.__name__ for enum in range(threads): LOGGER.debug( "Creating %(class_name)s thread %(enum)s", {"enum": enum, "class_name": class_name}, ) if thread_queue: thread = thread_class( worker_queue=thread_queue, thread_num=enum, **params ) else: thread = thread_class(thread_num=enum, **params) thread.daemon = True thread_list.append(thread) thread.start() LOGGER.info( "Created %(thread_list_len)s %(class_name)s thread(s)", {"thread_list_len": len(thread_list), "class_name": class_name}, ) return thread_list
[docs] def has_working_thread(thread_list: list) -> bool: """Checks if there are threads still alive Args: thread_list (list): List of thread objects Returns: bool: True/False if has at least one thread alive """ for thread in thread_list: if thread.is_alive(): return True return False
# pylint: disable=unsubscriptable-object
[docs] def thread_metrics(thread_list: list) -> tuple[int, int, int]: """Retrieves built-in metrics from all threads Args: thread_list (list): List of thread objects Returns: tuple: Containing rows_processed: Number of queue records processed for all threads rows_errored: Number of queue records errored for all threads threads: Number of threads """ return ( sum([thread.rows_processed for thread in thread_list]), sum([thread.rows_errored for thread in thread_list]), len(thread_list), )
[docs] def wait_queue_empty( thread_queue: queue.Queue, thread_list: list, interval: int = 5 ): """Check if a queue is empty and outputs logging messages, additionally checks if there are threads alive to prevent waiting for a queue to finish if all worker threads have stopped Args: thread_queue (queue.Queue): Queue to check for empty status thread_list (list): List of threads working the queue interval (int, Optional): Interval in seconds to print queue depth, defaults to 5 .. note:: If set to 0, the queue will not be actively checked while printing status message, instead it will wait until the queue is empty through the queue.join() function Raises: Exception: If queue is not empty and no working threads in thread_list """ if interval > 0: __wait(thread_queue, thread_list, interval) thread_queue.join()
def __wait(thread_queue: queue.Queue, thread_list: list, interval: int = 5): """Loop until queue is empty Args: thread_queue (queue.Queue): Queue to check for empty status thread_list (list): List of threads working the queue interval (int, Optional): Interval in seconds to print queue depth, defaults to 5 .. note:: If set to 0, the queue will not be actively checked while printing status message, instead it will wait until the queue is empty through the queue.join() function Raises: Exception: If queue is not empty and no working threads in thread_list """ i = 0 while not thread_queue.empty(): time.sleep(1) if has_working_thread(thread_list): i += 1 __log_size(i, interval, thread_queue.qsize()) else: raise Exception( f"{thread_queue.qsize()} records in queue with no active " "threads" ) def __log_size(i: int, interval: int, q_size: int): """Log the current size of queue if interval matches Args: i (int): Current second interval (int): Interval at which to print q_size (int): Current queue size """ if i == interval: LOGGER.info("%(q_size)s Records in Queue", {"q_size": q_size}) i = 0