Summary: in this tutorial, you’ll learn how to use the Python ThreadPoolExecutor
to develop multi-threaded programs.
Introduction to the Python ThreadPoolExecutor class
In the multithreading tutorial, you learned how to manage multiple threads in a program using the Thread
class of the threading
module. The Thread
class is useful when you want to create threads manually.
However, manually managing threads is not efficient because creating and destroying many threads frequently are very expensive in terms of computational costs.
Instead of doing so, you may want to reuse the threads if you expect to run many ad-hoc tasks in the program. A thread pool allows you to achieve this.
Thread pool
A thread pool is a pattern for achieving concurrency of execution in a program. A thread pool allows you to automatically manage a pool of threads efficiently:
Each thread in the pool is called a worker thread or a worker. A thread pool allows you to reuse the worker threads once the tasks are completed. It also protects against unexpected failures such as exceptions.
Typically, a thread pool allows you to configure the number of worker threads and provides a specific naming convention for each worker thread.
To create a thread pool, you use the ThreadPoolExecutor
class from the concurrent.futures
module.
ThreadPoolExecutor
The ThreadPoolExecutor
class extends the Executor
class and returns a Future
object.
Executor
The Executor
class has three methods to control the thread pool:
submit()
– dispatch a function to be executed and return aFuture
object. Thesubmit()
method takes a function and executes it asynchronously.map()
– execute a function asynchronously for each element in an iterable.shutdown()
– shut down the executor.
When you create a new instance of the ThreadPoolExecutor
class, Python starts the Executor
.
Once completing working with the executor, you must explicitly call the shutdown()
method to release the resource held by the executor. To avoid calling the shutdown()
method explicitly, you can use the context manager.
Future object
A Future
is an object that represents the eventual result of an asynchronous operation. The Future class has two useful methods:
result()
– return the result of an asynchronous operation.exception()
– return the exception of an asynchronous operation in case an exception occurs.
Python ThreadPoolExecutor examples
The following program uses a single thread:
from time import sleep, perf_counter def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Code language: Python (python)
Output:
Starting the task 1... Done with task 1 Starting the task 2... Done with task 2 It took 2.0144479 second(s) to finish.
Code language: Python (python)
How it works.
First, define the task()
function that takes about one second to finish. The task()
function calls the sleep()
function to simulate a delay:
def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}'
Code language: Python (python)
Second, call the task()
function twice and print out the result. Before and after calling the task()
function, we use the perf_counter()
to measure the start and finish time:
start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter()
Code language: Python (python)
Third, print out the time the program took to run:
print(f"It took {finish-start} second(s) to finish.")
Code language: Python (python)
Because the task()
function takes one second, calling it twice will take about 2 seconds.
Using the submit() method example
To run the task()
function concurrently, you can use the ThreadPoolExecutor
class:
from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Code language: Python (python)
Output:
Starting the task 1... Starting the task 2... Done with task 1 Done with task 2 It took 1.0177214 second(s) to finish.
Code language: Python (python)
The output shows that the program took about 1 second to finish.
How it works (we’ll focus on the thread pool part):
First, import the ThreadPoolExecutor
class from the concurrent.futures
module:
from concurrent.futures import ThreadPoolExecutor
Code language: Python (python)
Second, create a thread pool using the ThreadPoolExecutor
using a context manager:
with ThreadPoolExecutor() as executor:
Code language: Python (python)
Third, calling the task()
function twice by passing it to the submit()
method of the executor:
with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result())
Code language: Python (python)
The submit()
method returns a Future object. In this example, we have two Future objects f1
and f2
. To get the result from the Future object, we called its result()
method.
Using the map() method example
The following program uses a ThreadPoolExecutor
class. However, instead of using the submit()
method, it uses the map()
method to execute a function:
from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: results = executor.map(task, [1,2]) for result in results: print(result) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Code language: Python (python)
How it works.
First, call the map()
method of the executor object to run the task function for each id in the list [1,2]. The map()
method returns an iterator that contains the result of the function calls.
results = executor.map(task, [1,2])
Code language: Python (python)
Second, iterate over the results and print them out:
for result in results: print(result)
Code language: Python (python)
Python ThreadPoolExecutor practical example
The following program downloads multiple images from Wikipedia using a thread pool:
from concurrent.futures import ThreadPoolExecutor from urllib.request import urlopen import time import os def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...') start = time.perf_counter() urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg', 'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg', 'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg', 'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG', 'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg'] with ThreadPoolExecutor() as executor: executor.map(download_image, urls) finish = time.perf_counter() print(f'It took {finish-start} second(s) to finish.')
Code language: Python (python)
How it works.
First, define a function download_image()
that downloads an image from an URL and saves it into a file:
def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...')
Code language: Python (python)
The download_image()
function the urlopen()
function from the urllib.request
module to download an image from an URL.
Second, execute the download_image()
function using a thread pool by calling the map()
method of the ThreadPoolExecutor
object:
with ThreadPoolExecutor() as executor: executor.map(download_image, urls)
Leave a Reply