Python Threading Basics

A thread represents, "an activity that is run in a separate thread of control," (from the docs). In other words, a thread lets you step outside the normal flow of your program and perform some task. This is great if you need to parallelize work.

Common example from a digital marketer: scraping websites. Lots of requests to make the IO takes time, so Python's threading module works fairly well for that use case.

This is a relatively gentle introduction to using the Threading module in python. All examples are in Python 3.

Starting Simple

Threads can be given tasks by passing a callable to their constructor or creating a subclass of threading.Thread and overriding the run method. Our simple example will just sleep for a random(ish) amount of time between 1 and 20 seconds.

import random
import threading
import time

class SleepingThread(threading.Thread):

    #: How long we're going to sleep for
    sleep_length = None

    def __init__(self, sleep_length=None):
        super().__init__()
        self.sleep_length = sleep_length or random.randrange(1, 20)

    def run(self):
        time.sleep(self.sleep_length)

To execute this thread, we simply call start to run it, and join to block and wait for it to finish. We can do this with multiple threads by keeping track of them with a list or other iterable.

if __name__ == '__main__':
    # create and start our threads
    threads = list()
    for i in range(4):
        t = SleepingThread()
        threads.append(t)
        print('Starting Thread {}'.format(i))
        t.start()

    # wait for each to finish (join)
    for i, t in enumerate(threads):
        t.join()
        print('Thread {} Stopped'.format(i))

The run method we overrode can be anything -- any task you wish the thread to perform. We could also pass the thread a callable to its constructor:

import random
import threading
import time

def sleeper():
    time.sleep(random.randrange(1, 20))

if __name__ == '__main__':
    # create and start our threads
    threads = list()
    for i in range(4):
        t = threading.Thread(target=sleeper) # pass in the callable
        threads.append(t)
        print('Starting Thread {}'.format(i))
        t.start()

    # wait for each to finish (join)
    for i, t in enumerate(threads):
        t.join()
        print('Thread {} Stopped'.format(i))

This approach, creating a thread and waiting for it to join, works well if you know exactly what needs to be done and how many threads are needed.

A Real Task

Let's do a more real example that we can refactor through the rest of this tutorial. Given a list of URLs visit each, check its status code and write that status code to stdout.

Let's start with our worker thread. We'll use the requests module and httpbin.

import threading
import requests

class StatusChecker(threading.Thread):
    """
    The thread that will check HTTP statuses.
    """

    #: The url this thread will check
    to_check = None

    #: The status code of the check url
    status_code = None

    def __init__(self, to_check):
        super().__init__()
        self.to_check = to_check

    def run(self):
        resp = requests.get(self.to_check)
        self.status_code = resp.status_code

Running and managing a set of these threads is very similar to the SleepingThread example above. We loop thread the urls to check, create a thread for each, start it, and wait for it to join. Once that's done we can check the status_code variable on the thread for our result.

if __name__ == '__main__':
    urls = (
        'http://httpbin.org/status/418',
        'http://httpbin.org/status/200',
        'http://httpbin.org/status/404',
        'http://httpbin.org/status/500',
    )

    threads = dict()
    for url in urls:
        threads[url] = StatusChecker(url)
        print('Starting check for {}'.format(url))
        threads[url].start()

    for _, thread in threads.items():
        thread.join()

    for url in urls:
        print('{} - {}'.format(url, threads[url].status_code))

The approach of creating a thread and waiting for it to join works great for this use case: there are exactly 4 urls to check. We know the exact bounds of the task.

But what if there were 400 urls? Would 400 threads work as well as four? It would probably be better to use a queue and a set of workers in that case.

Using Queues and Workers

The python queue module implements a set of queues that are safe for multi-threaded environments. We don't want to use something like a list to share between threads because its not thread safe. In other words, two threads could call pop at the same time and get the same value.

Rather than passing a url to our status checker thread, we'll pass it the queue. It will pop a value off the queue and check the url.

import threading
import queue
import requests

class StatusChecker(threading.Thread):
    """
    The thread that will check HTTP statuses.
    """

    #: The queue of urls
    url_queue = None

    #: The status code of the check url
    status_code = None

    def __init__(self, url_queue):
        super().__init__()
        self.url_queue = url_queue

    def run(self):
        while True:
            try:
                # this will throw queue.Empty immediately if there's
                # no tasks left
                to_check = self.url_queue.get_nowait()
            except queue.Empty:
                break # empty queue, we're done!
            else:
                resp = requests.get(to_check)
                self.status_code = resp.status_code
                self.url_queue.task_done() # tell the queue we're done

Running this one is a bit different. Rather than waiting for the threads to join we wait for the queue to join -- have all of its tasks finished.

if __name__ == '__main__':
    urls = (
        'http://httpbin.org/status/418',
        'http://httpbin.org/status/200',
        'http://httpbin.org/status/404',
        'http://httpbin.org/status/500',
    )

    url_queue = queue.Queue()
    for url in urls:
        url_queue.put(url)

    num_workers = 2
    threads = list()
    for i in range(num_workers):
        t = StatusChecker(url_queue)
        threads.append(t)
        print('Starting worker {}'.format(i))
        t.start()

    # wait for the queue to empty
    url_queue.join()

    # how do we get the status codes back?!

Running the code reveals a problem: before one url per thread mean we could just get the status code from the thread object. How do we get the status code back now?

Use another queue to shove results into!

class StatusChecker(threading.Thread):
    """
    The thread that will check HTTP statuses.
    """

    #: The queue of urls
    url_queue = None

    #: The queue our results will go into
    result_queue = None

    def __init__(self, url_queue, result_queue):
        super().__init__()
        self.url_queue = url_queue
        self.result_queue = result_queue

    def run(self):
        while True:
            try:
                # this will throw queue.Empty immediately if there's
                # no tasks left
                to_check = self.url_queue.get_nowait()
            except queue.Empty:
                break # empty queue, we're done!
            else:
                resp = requests.get(to_check)
                self.result_queue.put((to_check, resp.status_code,))
                self.url_queue.task_done() # the the queue we're done

The key bit is...

self.result_queue.put((to_check, resp.status_code,))

The url and status are put into the result queue as a tuple. To print the status codes on the other end, we can fetch them out of the result queue.

if __name__ == '__main__':
    # same stuff as before

    # wait for the queue to empty
    url_queue.join()

    while not result_queue.empty():
        url, status = result_queue.get_nowait()
        print('{} - {}'.format(url, status))

Stopping Threads

If you ran some of the code above and hit Ctrl+C you might notice that it sometimes doesn't exit nicely. We need a way to respond to signals and stop the threads gracefully. We could catch a KeyboardInterrupt exception in the main thread and exit, but that wouldn't let our threads exit gracefully. There's also no guaruntee the python interpreter will throw KeyboardInterrupt in the main thread.

This is where threading.Event objects come in. We can use them to set an event that tells a thread to do something (like exit!). Event objects are thread safe so we can share an instance of a single event between all of our workers.

Our StatusChecker thread would get another constructor argument and instead of doing a while True: loop it would do while not self.stopper.is_set():

import threading
import queue
import requests

class StatusChecker(threading.Thread):
    """
    The thread that will check HTTP statuses.
    """

    #: The queue of urls
    url_queue = None

    #: The queue our results will go into
    result_queue = None

    #: An event that tells the thread to stop
    stopper = None

    def __init__(self, url_queue, result_queue, stopper):
        super().__init__()
        self.url_queue = url_queue
        self.result_queue = result_queue
        self.stopper = stopper

    def run(self):
        while not self.stopper.is_set():
            try:
                to_check = self.url_queue.get_nowait()
            except queue.Empty:
                break
            else:
                resp = requests.get(to_check)
                self.result_queue.put((to_check, resp.status_code,))
                self.url_queue.task_done()

And to run this we would create the stopper event in __main__ and pass it to our threads. Let's put a lot more urls into our queue here so we have some time to try the Ctrl+C stuff.

if __name__ == '__main__':
    url_queue = queue.Queue()
    for i in range(100):
        url_queue.put('http://httpbin.org/status/418')

    stopper = threading.Event()
    result_queue = queue.Queue()
    num_workers = 2
    threads = list()
    for i in range(num_workers):
        t = StatusChecker(url_queue, result_queue, stopper)
        threads.append(t)
        print('Starting worker {}'.format(i))
        t.start()

    # wait for the queue to empty
    url_queue.join()

    while not result_queue.empty():
        url, status = result_queue.get_nowait()
        print('{} - {}'.format(url, status))

This will run just like our other examples above. Now we need a way to connect a Ctrl+C or SIGINT signal to setting our event. We could catch KeyboardInterrupt, but it has the problems outlined above: we can't guaruntee that KeyboardInterrupt will be thrown in the main thread. It also doesn't handle SIGINT from other sources.

Instead let's use the signal module. Our signal handler will need to set the event and wait for the threads to join.

Since our handler will need some state (the event, the workers), let's use an object.

import sys

class SignalHandler:
    """
    The object that will handle signals and stop the worker threads.
    """

    #: The stop event that's shared by this handler and threads.
    stopper = None

    #: The pool of worker threads
    workers = None

    def __init__(self, stopper, workers):
        self.stopper = stopper
        self.workers = workers

    def __call__(self, signum, frame):
        """
        This will be called by the python signal module

        https://docs.python.org/3/library/signal.html#signal.signal
        """
        self.stopper.set()

        for worker in self.workers:
            worker.join()

        sys.exit(0)

In the __call__ method, we set the stop event then wait for the threads to join and exit. The event means the threads will finish whatever task they're currently on before looping again and see that the event is now set.

To connect our handler to a SIGINT, we'd create a SignalHandler instance and call signal.signal(signal.SIGINT, SignaleHandler(stopper, workers)). The code to run our threaded checker is a bit different now. We need to create our worker pool without starting the threads so we can pass the them to our handler.

if __name__ == '__main__':
    # all the variables we'll need
    num_workers = 2
    stopper = threading.Event()
    result_queue = queue.Queue()
    url_queue = queue.Queue()

    # populate our work queue
    for i in range(100):
        url_queue.put('http://httpbin.org/status/418')

    # we need to keep track of the workers but not start them yet
    workers = [StatusChecker(url_queue, result_queue, stopper)
                    for i in range(num_workers)]

    # create our signal handler and connect it
    handler = SignalHandler(stopper, workers)
    signal.signal(signal.SIGINT, handler)

    # start the threads!
    for i, worker in enumerate(workers):
        print('Starting worker {}'.format(i))
        worker.start()

    # wait for the queue to empty
    url_queue.join()

    while not result_queue.empty():
        url, status = result_queue.get_nowait()
        print('{} - {}'.format(url, status))

Hitting Ctrl+C now will gracefully stop and exit the process.

Wrap Up

These concepts will take you a long way writing threaded python code. There's a Github Repo with all the code found here.

#