thrd

Functions/classes for multi-threading

base

BaseThread

The elevaso_spine.thrd.base.BaseThread() class provides a base threading class with built-in functions for initializing, running, and de-initializing multiple threads.

For example, if you need to connect to a database during thread initialization, then process records from a queue, and ensure proper database disconnection when complete, your code would look like:

from elevaso_spine.thrd.base import BaseThread

class DBThread(BaseThread):
    def connect(self, **kwargs):
        # Database connection code goes here
        print(kwargs["db_user"], kwargs["db_pwd"], kwargs["db_instance"])
        pass

    def disconnect(self):
        # Database disconnection code goes here

    init_func = self.connect
    stop_func = self.disconnect

The corresponding code to use the DBThread class above would look like:

import queue

thread_queue = queue.Queue() # Python shared queue between threads

thread_queue.put({"test": "value"}) # Insert values into the queue

thread_list = []

for x in range(10):
    thread = DBThread(worker_queue=thread_queue, thread_num=x, db_user="name", db_pwd="password", db_instance="test.example")

    thread.name = DBThread.__name__ + str(x)

    thread.daemon = thread_num
    thread_list.append(thread)
    thread.start()

q.join() # Wait until all queue ites have been processed

# Print stats from the threads
print("Procesed records: " + sum([t.rows_processed for t in thread_list]))
print("Errored records: " + sum([t.rows_errored for t in thread_list]))
class elevaso_spine.thrd.base.BaseThread(thread_num: int, **kwargs)[source]

Base multi-threading class with built-in functions

__init__(thread_num: int, **kwargs)[source]

Initialize a worker thread

Args:

thread_num (int): The unique number for the thread

Kwargs:

worker_queue (queue.Queue): Queue to pull work from, defaults to None

Note

Additional kwargs provided to the class will be stored in the kwargs property of the class.

process_queue()[source]

Process a record from shared queue if provided during initialization

run()[source]

Function to run the thread for processing

mgr

create

The elevaso_spine.thrd.mgr.create() function will generate X number of threads based on the class and number requested. Optionally, you can pass in additional parameters for the thread initialization (such as credentials to establish separate database connections) and shared python queue.

A basic example looks like:

from elevaso_spine.thrd.base import BaseThread
from elevaso_spine.thrd.mgr import create


thread_list = create(10, BaseThread)

_ = [t.join() for t in thread_list] # Wait until all threads are finished running

An example using python queue looks like:

import queue
from elevaso_spine.thrd.base import BaseThread
from elevaso_spine.thrd.mgr import create

thread_queue = queue.Queue()
thread_queue.put({"test": "data"})

thread_list = create(10, BaseThread, thread_queue=thread_queue)

thread_queue.join() # Wait until queue is empty

# Print the number of queue records processed or errored
print(sum([t.rows_processed for t in thread_list]))
print(sum([t.rows_errored for t in thread_list]))
elevaso_spine.thrd.mgr.create(threads: int, thread_class: object, params: dict, thread_queue: Queue = None) list[source]

Create threads for parallel processing

Args:

threads (int): Number of threads to create

thread_class (object): Thread class to create

params (dict): Dictionary of parameters to pass into thread

thread_queue (queue.Queue, Optional): Queue to retrieve items to work, defaults to None

Raises:

ValueError if threads == 0

Returns:

list: List of thread objects created

has_working_thread

The elevaso_spine.thrd.mgr.has_working_thread() function checks all threads provided and returns True if any are active, or False if none are active.

elevaso_spine.thrd.mgr.has_working_thread(thread_list: list) bool[source]

Checks if there are threads still alive

Args:

thread_list (list): List of thread objects

Returns:

bool: True/False if has at least one thread alive

thread_metrics

The elevaso_spine.thrd.mgr.thread_metrics() function calculates metrics from the threads.

elevaso_spine.thrd.mgr.thread_metrics(thread_list: list) tuple[int, int, int][source]

Retrieves built-in metrics from all threads

Args:

thread_list (list): List of thread objects

Returns:
tuple: Containing

rows_processed: Number of queue records processed for all threads rows_errored: Number of queue records errored for all threads threads: Number of threads

wait_queue_empty

The elevaso_spine.thrd.mgr.wait_queue_empty() function checks a shared queue until all items have been processed or there are no active threads, periodically logging the estimated size of the queue.

Example code:

import queue
from elevaso_spine.thrd.base import BaseThread
from elevaso_spine.thrd.mgr import create, thread_metrics, wait_queue_empty

thread_queue = queue.Queue()
thread_queue.put({"test": "data"})

thread_list = create(10, BaseThread, thread_queue=thread_queue)

wait_queue_empty(thread_queue, thread_list)

# Print the number of queue records processed or errored
print(thread_metrics(thread_list))
elevaso_spine.thrd.mgr.wait_queue_empty(thread_queue: Queue, thread_list: list, interval: int = 5)[source]

Check if a queue is empty and outputs logging messages, additionally checks if there are threads alive to prevent waiting for a queue to finish if all worker threads have stopped

Args:

thread_queue (queue.Queue): Queue to check for empty status

thread_list (list): List of threads working the queue

interval (int, Optional): Interval in seconds to print queue depth, defaults to 5

Note

If set to 0, the queue will not be actively checked while printing status message, instead it will wait until the queue is empty through the queue.join() function

Raises:

Exception: If queue is not empty and no working threads in thread_list