Python is one of the most popular languages for data processing and data science in general. The ecosystem provides a lot of libraries and frameworks that facilitate high-performance computing. Doing parallel programming in Python can prove quite tricky, though.
In this tutorial, we're going to study why parallelism is hard especially in the Python context, and for that, we will go through the following:
- Why is parallelism tricky in Python (hint: it's because of the GIL—the global interpreter lock).
- Threads vs. Processes: Different ways of achieving parallelism. When to use one over the other?
- Parallel vs. Concurrent: Why in some cases we can settle for concurrency rather than parallelism.
- Building a simple but practical example using the various techniques discussed.
Global Interpreter Lock
The Global Interpreter Lock (GIL) is one of the most controversial subjects in the Python world. In CPython, the most popular implementation of Python, the GIL is a mutex that makes things thread-safe. The GIL makes it easy to integrate with external libraries that are not thread-safe, and it makes non-parallel code faster. This comes at a cost, though. Due to the GIL, we can't achieve true parallelism via multithreading. Basically, two different native threads of the same process can't run Python code at once.
Things are not that bad, though, and here's why: stuff that happens outside the GIL realm is free to be parallel. In this category fall long-running tasks like I/O and, fortunately, libraries like numpy
.
Threads vs. Processes
So Python is not truly multithreaded. But what is a thread? Let's take a step back and look at things in perspective.
A process is a basic operating system abstraction. It is a program that is in execution—in other words, code that is running. Multiple processes are always running in a computer, and they are executing in parallel.
A process can have multiple threads. They execute the same code belonging to the parent process. Ideally, they run in parallel, but not necessarily. The reason why processes aren't enough is because applications need to be responsive and listen for user actions while updating the display and saving a file.
If that's still a bit unclear, here's a cheatsheet:
PROCESSES |
THREADS |
---|---|
Processes don't share memory |
Threads share memory |
Spawning/switching processes is expensive |
Spawning/switching threads is less expensive |
Processes require more resources |
Threads require fewer resources (are sometimes called lightweight processes) |
No memory synchronisation needed |
You need to use synchronisation mechanisms to be sure you're correctly handling the data |
There isn’t one recipe that accommodates everything. Choosing one is greatly dependent on the context and the task you are trying to achieve.
Parallel vs. Concurrent
Now we'll go one step further and dive into concurrency. Concurrency is often misunderstood and mistaken for parallelism. That's not the case. Concurrency implies scheduling independent code to be executed in a cooperative manner. Take advantage of the fact that a piece of code is waiting on I/O operations, and during that time run a different but independent part of the code.
In Python, we can achieve lightweight concurrent behaviour via greenlets. From a parallelization perspective, using threads or greenlets is equivalent because neither of them runs in parallel. Greenlets are even less expensive to create than threads. Because of that, greenlets are heavily used for performing a huge number of simple I/O tasks, like the ones usually found in networking and web servers.
Now that we know the difference between threads and processes, parallel and concurrent, we can illustrate how different tasks are performed on the two paradigms. Here's what we're going to do: we will run, multiple times, a task outside the GIL and one inside it. We're running them serially, using threads and using processes. Let's define the tasks:
import os import time import threading import multiprocessing NUM_WORKERS = 4 def only_sleep(): """ Do nothing, wait for a timer to expire """ print("PID: %s, Process Name: %s, Thread Name: %s" % ( os.getpid(), multiprocessing.current_process().name, threading.current_thread().name) ) time.sleep(1) def crunch_numbers(): """ Do some computations """ print("PID: %s, Process Name: %s, Thread Name: %s" % ( os.getpid(), multiprocessing.current_process().name, threading.current_thread().name) ) x = 0 while x < 10000000: x += 1
We've created two tasks. Both of them are long-running, but only crunch_numbers
actively performs computations. Let's run only_sleep
serially, multithreaded and using multiple processes and compare the results:
## Run tasks serially start_time = time.time() for _ in range(NUM_WORKERS): only_sleep() end_time = time.time() print("Serial time=", end_time - start_time) # Run tasks using threads start_time = time.time() threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)] [thread.start() for thread in threads] [thread.join() for thread in threads] end_time = time.time() print("Threads time=", end_time - start_time) # Run tasks using processes start_time = time.time() processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)] [process.start() for process in processes] [process.join() for process in processes] end_time = time.time() print("Parallel time=", end_time - start_time)
Here's the output I've got (yours should be similar, although PIDs and times will vary a bit):
PID: 95726, Process Name: MainProcess, Thread Name: MainThread PID: 95726, Process Name: MainProcess, Thread Name: MainThread PID: 95726, Process Name: MainProcess, Thread Name: MainThread PID: 95726, Process Name: MainProcess, Thread Name: MainThread Serial time= 4.018089056015015 PID: 95726, Process Name: MainProcess, Thread Name: Thread-1 PID: 95726, Process Name: MainProcess, Thread Name: Thread-2 PID: 95726, Process Name: MainProcess, Thread Name: Thread-3 PID: 95726, Process Name: MainProcess, Thread Name: Thread-4 Threads time= 1.0047411918640137 PID: 95728, Process Name: Process-1, Thread Name: MainThread PID: 95729, Process Name: Process-2, Thread Name: MainThread PID: 95730, Process Name: Process-3, Thread Name: MainThread PID: 95731, Process Name: Process-4, Thread Name: MainThread Parallel time= 1.014023780822754
Here are some observations:
-
In the case of the serial approach, things are pretty obvious. We're running the tasks one after the other. All four runs are executed by the same thread of the same process.
-
Using processes we cut the execution time down to a quarter of the original time, simply because the tasks are executed in parallel. Notice how each task is performed in a different process and on the
MainThread
of that process. -
Using threads we take advantage of the fact that the tasks can be executed concurrently. The execution time is also cut down to a quarter, even though nothing is running in parallel. Here's how that goes: we spawn the first thread and it starts waiting for the timer to expire. We pause its execution, letting it wait for the timer to expire, and in this time we spawn the second thread. We repeat this for all the threads. At one moment the timer of the first thread expires so we switch execution to it and we terminate it. The algorithm is repeated for the second and for all the other threads. At the end, the result is as if things were run in parallel. You'll also notice that the four different threads branch out from and live inside the same process:
MainProcess
. -
You may even notice that the threaded approach is quicker than the truly parallel one. That's due to the overhead of spawning processes. As we noted previously, spawning and switching processes is an expensive operation.
Let's do the same routine but this time running the crunch_numbers
task:
start_time = time.time() for _ in range(NUM_WORKERS): crunch_numbers() end_time = time.time() print("Serial time=", end_time - start_time) start_time = time.time() threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)] [thread.start() for thread in threads] [thread.join() for thread in threads] end_time = time.time() print("Threads time=", end_time - start_time) start_time = time.time() processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)] [process.start() for process in processes] [process.join() for process in processes] end_time = time.time() print("Parallel time=", end_time - start_time)
Here's the output I've got:
PID: 96285, Process Name: MainProcess, Thread Name: MainThread PID: 96285, Process Name: MainProcess, Thread Name: MainThread PID: 96285, Process Name: MainProcess, Thread Name: MainThread PID: 96285, Process Name: MainProcess, Thread Name: MainThread Serial time= 2.705625057220459 PID: 96285, Process Name: MainProcess, Thread Name: Thread-1 PID: 96285, Process Name: MainProcess, Thread Name: Thread-2 PID: 96285, Process Name: MainProcess, Thread Name: Thread-3 PID: 96285, Process Name: MainProcess, Thread Name: Thread-4 Threads time= 2.6961309909820557 PID: 96289, Process Name: Process-1, Thread Name: MainThread PID: 96290, Process Name: Process-2, Thread Name: MainThread PID: 96291, Process Name: Process-3, Thread Name: MainThread PID: 96292, Process Name: Process-4, Thread Name: MainThread Parallel time= 0.8014059066772461
The main difference here is in the result of the multithreaded approach. This time it performs very similarly to the serial approach, and here's why: since it performs computations and Python doesn't perform real parallelism, the threads are basically running one after the other, yielding execution to one another until they all finish.
The Python Parallel/Concurrent Programming Ecosystem
Python has rich APIs for doing parallel/concurrent programming. In this tutorial we're covering the most popular ones, but you have to know that for any need you have in this domain, there's probably something already out there that can help you achieve your goal.
In the next section, we'll build a practical application in many forms, using all of the libraries presented. Without further ado, here are the modules/libraries we're going to cover:
-
threading
: The standard way of working with threads in Python. It is a higher-level API wrapper over the functionality exposed by the_thread
module, which is a low-level interface over the operating system's thread implementation. -
concurrent.futures
: A module part of the standard library that provides an even higher-level abstraction layer over threads. The threads are modelled as asynchronous tasks. -
multiprocessing
: Similar to thethreading
module, offering a very similar interface but using processes instead of threads. -
gevent and greenlets
: Greenlets, also called micro-threads, are units of execution that can be scheduled collaboratively and can perform tasks concurrently without much overhead. -
celery
: A high-level distributed task queue. The tasks are queued and executed concurrently using various paradigms likemultiprocessing
orgevent
.
Building a Practical Application
Knowing the theory is nice and fine, but the best way to learn is to build something practical, right? In this section, we're going to build a classic type of application going through all the different paradigms.
Let's build an application that checks the uptime of websites. There are a lot of such solutions out there, the most well-known ones being probably Jetpack Monitor and Uptime Robot. The purpose of these apps is to notify you when your website is down so that you can quickly take action. Here's how they work:
- The application goes very frequently over a list of website URLs and checks if those websites are up.
- Every website should be checked every 5-10 minutes so that the downtime is not significant.
- Instead of performing a classic HTTP GET request, it performs a HEAD request so that it does not affect your traffic significantly.
- If the HTTP status is in the danger ranges (400+, 500+), the owner is notified.
- The owner is notified either by email, text-message, or push notification.
Here's why it's essential to take a parallel/concurrent approach to the problem. As the list of websites grows, going through the list serially won't guarantee us that every website is checked every five minutes or so. The websites could be down for hours, and the owner won't be notified.
Let's start by writing some utilities:
# utils.py import time import logging import requests class WebsiteDownException(Exception): pass def ping_website(address, timeout=20): """ Check if a website is down. A website is considered down if either the status_code >= 400 or if the timeout expires Throw a WebsiteDownException if any of the website down conditions are met """ try: response = requests.head(address, timeout=timeout) if response.status_code >= 400: logging.warning("Website %s returned status_code=%s" % (address, response.status_code)) raise WebsiteDownException() except requests.exceptions.RequestException: logging.warning("Timeout expired for website %s" % address) raise WebsiteDownException() def notify_owner(address): """ Send the owner of the address a notification that their website is down For now, we're just going to sleep for 0.5 seconds but this is where you would send an email, push notification or text-message """ logging.info("Notifying the owner of %s website" % address) time.sleep(0.5) def check_website(address): """ Utility function: check if a website is down, if so, notify the user """ try: ping_website(address) except WebsiteDownException: notify_owner(address)
We'll actually need a website list to try our system out. Create your own list or use mine:
# websites.py WEBSITE_LIST = [ 'http://envato.com', 'http://amazon.co.uk', 'http://amazon.com', 'http://facebook.com', 'http://google.com', 'http://google.fr', 'http://google.es', 'http://google.co.uk', 'http://internet.org', 'http://gmail.com', 'http://ift.tt/gbk8l4', 'http://github.com', 'http://heroku.com', 'http://ift.tt/2oO8vlq', 'http://ift.tt/rBjKuM', 'http://rubyonrails.org', 'http://basecamp.com', 'http://trello.com', 'http://yiiframework.com', 'http://shopify.com', 'http://ift.tt/2oNXL6K', 'http://airbnb.com', 'http://instagram.com', 'http://snapchat.com', 'http://youtube.com', 'http://baidu.com', 'http://yahoo.com', 'http://live.com', 'http://linkedin.com', 'http://yandex.ru', 'http://netflix.com', 'http://wordpress.com', 'http://bing.com', ]
Normally, you'd keep this list in a database along with owner contact information so that you can contact them. Since this is not the main topic of this tutorial, and for the sake of simplicity, we're just going to use this Python list.
If you paid really good attention, you might have noticed two really long domains in the list that are not valid websites (I hope nobody bought them by the time you're reading this to prove me wrong!). I added these two domains to be sure we have some websites down on every run. Also, let's name our app UptimeSquirrel.
Serial Approach
First, let's try the serial approach and see how badly it performs. We'll consider this the baseline.
# serial_squirrel.py import time start_time = time.time() for address in WEBSITE_LIST: check_website(address) end_time = time.time() print("Time for SerialSquirrel: %ssecs" % (end_time - start_time)) # WARNING:root:Timeout expired for website http://ift.tt/2oO8vlq # WARNING:root:Timeout expired for website http://ift.tt/2oNXL6K # WARNING:root:Website http://bing.com returned status_code=405 # Time for SerialSquirrel: 15.881232261657715secs
Threading Approach
We're going to get a bit more creative with the implementation of the threaded approach. We're using a queue to put the addresses in and create worker threads to get them out of the queue and process them. We're going to wait for the queue to be empty, meaning that all the addresses have been processed by our worker threads.
# threaded_squirrel.py import time from queue import Queue from threading import Thread NUM_WORKERS = 4 task_queue = Queue() def worker(): # Constantly check the queue for addresses while True: address = task_queue.get() check_website(address) # Mark the processed task as done task_queue.task_done() start_time = time.time() # Create the worker threads threads = [Thread(target=worker) for _ in range(NUM_WORKERS)] # Add the websites to the task queue [task_queue.put(item) for item in WEBSITE_LIST] # Start all the workers [thread.start() for thread in threads] # Wait for all the tasks in the queue to be processed task_queue.join() end_time = time.time() print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time)) # WARNING:root:Timeout expired for website http://ift.tt/2oO8vlq # WARNING:root:Timeout expired for website http://ift.tt/2oNXL6K # WARNING:root:Website http://bing.com returned status_code=405 # Time for ThreadedSquirrel: 3.110753059387207secs
concurrent.futures
As stated previously, concurrent.futures
is a high-level API for using threads. The approach we're taking here implies using a ThreadPoolExecutor
. We're going to submit tasks to the pool and get back futures, which are results that will be available to us in the future. Of course, we can wait for all futures to become actual results.
# future_squirrel.py import time import concurrent.futures NUM_WORKERS = 4 start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = {executor.submit(check_website, address) for address in WEBSITE_LIST} concurrent.futures.wait(futures) end_time = time.time() print("Time for FutureSquirrel: %ssecs" % (end_time - start_time)) # WARNING:root:Timeout expired for website http://ift.tt/2oO8vlq # WARNING:root:Timeout expired for website http://ift.tt/2oNXL6K # WARNING:root:Website http://bing.com returned status_code=405 # Time for FutureSquirrel: 1.812899112701416secs
The Multiprocessing Approach
The multiprocessing
library provides an almost drop-in replacement API for the threading
library. In this case, we're going to take an approach more similar to the concurrent.futures
one. We're setting up a multiprocessing.Pool
and submitting tasks to it by mapping a function to the list of addresses (think of the classic Python map
function).
# multiprocessing_squirrel.py import time import socket import multiprocessing NUM_WORKERS = 4 start_time = time.time() with multiprocessing.Pool(processes=NUM_WORKERS) as pool: results = pool.map_async(check_website, WEBSITE_LIST) results.wait() end_time = time.time() print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time)) # WARNING:root:Timeout expired for website http://ift.tt/2oO8vlq # WARNING:root:Timeout expired for website http://ift.tt/2oNXL6K # WARNING:root:Website http://bing.com returned status_code=405 # Time for MultiProcessingSquirrel: 2.8224599361419678secs
Gevent
Gevent is a popular alternative for achieving massive concurrency. There are a few things you need to know before using it:
-
Code performed concurrently by greenlets is deterministic. As opposed to the other presented alternatives, this paradigm guarantees that for any two identical runs, you'll always get the same results in the same order.
-
You need to monkey patch standard functions so that they cooperate with gevent. Here's what I mean by that. Normally, a socket operation is blocking. We're waiting for the operation to finish. If we were in a multithreaded environment, the scheduler would simply switch to another thread while the other one is waiting for I/O. Since we're not in a multithreaded environment, gevent patches the standard functions so that they become non-blocking and return control to the gevent scheduler.
To install gevent, run: pip install gevent
Here's how to use gevent to perform our task using a gevent.pool.Pool
:
# green_squirrel.py import time from gevent.pool import Pool from gevent import monkey # Note that you can spawn many workers with gevent since the cost of creating and switching is very low NUM_WORKERS = 4 # Monkey-Patch socket module for HTTP requests monkey.patch_socket() start_time = time.time() pool = Pool(NUM_WORKERS) for address in WEBSITE_LIST: pool.spawn(check_website, address) # Wait for stuff to finish pool.join() end_time = time.time() print("Time for GreenSquirrel: %ssecs" % (end_time - start_time)) # Time for GreenSquirrel: 3.8395519256591797secs
Celery
Celery is an approach that mostly differs from what we've seen so far. It is battle tested in the context of very complex and high-performance environments. Setting up Celery will require a bit more tinkering than all the above solutions.
First, we'll need to install Celery:
pip install celery
Tasks are the central concepts within the Celery project. Everything that you'll want to run inside Celery needs to be a task. Celery offers great flexibility for running tasks: you can run them synchronously or asynchronously, real-time or scheduled, on the same machine or on multiple machines, and using threads, processes, Eventlet, or gevent.
The arrangement will be slightly more complex. Celery uses other services for sending and receiving messages. These messages are usually tasks or results from tasks. We're going to use Redis in this tutorial for this purpose. Redis is a great choice because it's really easy to install and configure, and it's really possible you already use it in your application for other purposes, such as caching and pub/sub.
You can install Redis by following the instructions on the Redis Quick Start page. Don't forget to install the redis
Python library, pip install redis
, and the bundle necessary for using Redis and Celery: pip install celery[redis]
.
Start the Redis server like this: $ redis-server
To get started building stuff with Celery, we'll first need to create a Celery application. After that, Celery needs to know what kind of tasks it might execute. To achieve that, we need to register tasks to the Celery application. We'll do this using the @app.task
decorator:
# celery_squirrel.py import time from utils import check_website from data import WEBSITE_LIST from celery import Celery from celery.result import ResultSet app = Celery('celery_squirrel', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') @app.task def check_website_task(address): return check_website(address) if __name__ == "__main__": start_time = time.time() # Using `delay` runs the task async rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST]) # Wait for the tasks to finish rs.get() end_time = time.time() print("CelerySquirrel:", end_time - start_time) # CelerySquirrel: 2.4979639053344727
Don't panic if nothing is happening. Remember, Celery is a service, and we need to run it. Till now, we only placed the tasks in Redis but did not start Celery to execute them. To do that, we need to run this command in the folder where our code resides:
celery worker -A do_celery --loglevel=debug --concurrency=4
Now rerun the Python script and see what happens. One thing to pay attention to: notice how we passed the Redis address to our Redis application twice. The broker
parameter specifies where the tasks are passed to Celery, and backend
is where Celery puts the results so that we can use them in our app. If we don't specify a result backend
, there's no way for us to know when the task was processed and what the result was.
Also, be aware that the logs now are in the standard output of the Celery process, so be sure to check them out in the appropriate terminal.
Conclusions
I hope this has been an interesting journey for you and a good introduction to the world of parallel/concurrent programming in Python. This is the end of the journey, and there are some conclusions we can draw:
- There are several paradigms that help us achieve high-performance computing in Python.
- For the multi-threaded paradigm, we have the
threading
andconcurrent.futures
libraries. multiprocessing
provides a very similar interface tothreading
but for processes rather than threads.- Remember that processes achieve true parallelism, but they are more expensive to create.
- Remember that a process can have more threads running inside it.
- Do not mistake parallel for concurrent. Remember that only the parallel approach takes advantage of multi-core processors, whereas concurrent programming intelligently schedules tasks so that waiting on long-running operations is done while in parallel doing actual computation.
No comments:
Post a Comment