thrd
Functions/classes for multi-threading
base
BaseThread
The elevaso_spine.thrd.base.BaseThread() class provides a base threading class with built-in functions for initializing, running, and de-initializing multiple threads.
For example, if you need to connect to a database during thread initialization, then process records from a queue, and ensure proper database disconnection when complete, your code would look like:
from elevaso_spine.thrd.base import BaseThread
class DBThread(BaseThread):
def connect(self, **kwargs):
# Database connection code goes here
print(kwargs["db_user"], kwargs["db_pwd"], kwargs["db_instance"])
pass
def disconnect(self):
# Database disconnection code goes here
init_func = self.connect
stop_func = self.disconnect
The corresponding code to use the DBThread class above would look like:
import queue
thread_queue = queue.Queue() # Python shared queue between threads
thread_queue.put({"test": "value"}) # Insert values into the queue
thread_list = []
for x in range(10):
thread = DBThread(worker_queue=thread_queue, thread_num=x, db_user="name", db_pwd="password", db_instance="test.example")
thread.name = DBThread.__name__ + str(x)
thread.daemon = thread_num
thread_list.append(thread)
thread.start()
q.join() # Wait until all queue ites have been processed
# Print stats from the threads
print("Procesed records: " + sum([t.rows_processed for t in thread_list]))
print("Errored records: " + sum([t.rows_errored for t in thread_list]))
- class elevaso_spine.thrd.base.BaseThread(thread_num: int, **kwargs)[source]
Base multi-threading class with built-in functions
- __init__(thread_num: int, **kwargs)[source]
Initialize a worker thread
- Args:
thread_num (int): The unique number for the thread
- Kwargs:
worker_queue (queue.Queue): Queue to pull work from, defaults to None
Note
Additional kwargs provided to the class will be stored in the kwargs property of the class.
mgr
create
The elevaso_spine.thrd.mgr.create() function will generate X number of threads based on the class and number requested. Optionally, you can pass in additional parameters for the thread initialization (such as credentials to establish separate database connections) and shared python queue.
A basic example looks like:
from elevaso_spine.thrd.base import BaseThread
from elevaso_spine.thrd.mgr import create
thread_list = create(10, BaseThread)
_ = [t.join() for t in thread_list] # Wait until all threads are finished running
An example using python queue looks like:
import queue
from elevaso_spine.thrd.base import BaseThread
from elevaso_spine.thrd.mgr import create
thread_queue = queue.Queue()
thread_queue.put({"test": "data"})
thread_list = create(10, BaseThread, thread_queue=thread_queue)
thread_queue.join() # Wait until queue is empty
# Print the number of queue records processed or errored
print(sum([t.rows_processed for t in thread_list]))
print(sum([t.rows_errored for t in thread_list]))
- elevaso_spine.thrd.mgr.create(threads: int, thread_class: object, params: dict, thread_queue: Queue = None) list[source]
Create threads for parallel processing
- Args:
threads (int): Number of threads to create
thread_class (object): Thread class to create
params (dict): Dictionary of parameters to pass into thread
thread_queue (queue.Queue, Optional): Queue to retrieve items to work, defaults to None
- Raises:
ValueError if threads == 0
- Returns:
list: List of thread objects created
has_working_thread
The elevaso_spine.thrd.mgr.has_working_thread() function checks all threads provided and returns True if any are active, or False if none are active.
thread_metrics
The elevaso_spine.thrd.mgr.thread_metrics() function calculates metrics from the threads.
- elevaso_spine.thrd.mgr.thread_metrics(thread_list: list) tuple[int, int, int][source]
Retrieves built-in metrics from all threads
- Args:
thread_list (list): List of thread objects
- Returns:
- tuple: Containing
rows_processed: Number of queue records processed for all threads rows_errored: Number of queue records errored for all threads threads: Number of threads
wait_queue_empty
The elevaso_spine.thrd.mgr.wait_queue_empty() function checks a shared queue until all items have been processed or there are no active threads, periodically logging the estimated size of the queue.
Example code:
import queue
from elevaso_spine.thrd.base import BaseThread
from elevaso_spine.thrd.mgr import create, thread_metrics, wait_queue_empty
thread_queue = queue.Queue()
thread_queue.put({"test": "data"})
thread_list = create(10, BaseThread, thread_queue=thread_queue)
wait_queue_empty(thread_queue, thread_list)
# Print the number of queue records processed or errored
print(thread_metrics(thread_list))
- elevaso_spine.thrd.mgr.wait_queue_empty(thread_queue: Queue, thread_list: list, interval: int = 5)[source]
Check if a queue is empty and outputs logging messages, additionally checks if there are threads alive to prevent waiting for a queue to finish if all worker threads have stopped
- Args:
thread_queue (queue.Queue): Queue to check for empty status
thread_list (list): List of threads working the queue
interval (int, Optional): Interval in seconds to print queue depth, defaults to 5
Note
If set to 0, the queue will not be actively checked while printing status message, instead it will wait until the queue is empty through the queue.join() function
- Raises:
Exception: If queue is not empty and no working threads in thread_list