Concurrency with futures

Concurency using the concurrent.futures module

Concurrency in python has always been criticized due to GIL,stating that it is impossible to write concurrent code in python.I will work through some examples of writing concurrent code using futures module in this post and in later posts will cover concurrency using asyncio and the multiprocessing module.

Example

Sequential Download

In this example, I will be downloading some country flags. The first example will be a sequential script:

import os
import time
import sys
import requests


countries = ['IN','US','FR','JP','ZA','DE','RU','EG','VN']

BASE_URL = "https://flagcdn.com/16x12/"

DOWNLOAD_DIR =  'downloads/'

def save_flag(img,filename):
    path = os.path.join(DOWNLOAD_DIR,filename)
    with open(path,'wb') as f:
        f.write(img)

def get_flag(country):

    url = f'{BASE_URL}/{country}.png'
    resp = requests.get(url)
    return resp.content

def download(c_list):
    for c in sorted(c_list):
        img = get_flag(c)
        save_flag(img,c.lower()+'.png',)

    return len(c_list)

def main(download):
    start = time.time()
    count = download(countries)
    stop = time.time()
    elapsed = stop - start
    msg = f'Downloaded {count} flags in {elapsed}'
    print(msg)


if __name__== "__main__":
    main(download)

This downloaded nine flags in 12 seconds.Will reuse most of the functions from the above code to write the concurrent one.

Concurrent Download with concurrent.futures

The main classes of this package are the ThreadPoolExecutor and ProcessPoolExecutor, which implement functions to allow it to submit callables for execution in different processes or threads. These classes manage the internal pool of worker threads and processes and a queue of tasks to be executed.

from concurrent import futures
from sequential import save_flag,get_flag,main,countries

def download_one(country):
    image = get_flag(country)
    save_flag(image,country.lower() +'.png')
    return country

def download_many(c_list):
    workers = len(countries)
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one,sorted(countries))

    return len(list(res))

if __name__== '__main__':
    main(download_many)

With the use of futures, the download took ~1.8 seconds.

What are Futures

In python the Future class is defined in two places in the standard library: concurrent.futures.Future and asyncio.Future.

Notes:

  1. They serve the same purpose: an instance of either of the Future class represents a deffered computation that may or may have not completed.
  2. Futures encapsulate pending operations whose state can be put in queues which can be queried for completion or results or or exceptions.
  3. They are mainly instantiated exclusivley by the concurrency framework, be it concurrent.futures or asyncio.
  4. Both type of Future have a .done() method that is nonblocking and returns a Boolean which gives an indication whether the callable associated to that future is completed or not.The other method is *.result() which returns the result of the callable or re-raises any exception that might have been raised. The behaviour of the result method varies between the two variants of Future when the future is in progress.In a concurrency.futures.Future instance, invoking f.result() will block the caller’s thread until the result is ready.An optional timeout argument can be passed, and if the result is not ready a TimeoutError exception will be thrown.While the asyncio.Future.result method is non-blocking by nature and does not support timeout and the preffered way to get the result is using the yield method.

A more practical example of the usage of futures replacing the executor.map is replaced by two for loops, one to schedule the futures,the other to retrieve the result.For getting the results futures.as_completed is used,which takes an iterable of futures and yields an iterable of futures as they are done.

from concurrent import futures
from sequential import save_flag,get_flag,main,countries

def download_one(country):
    image = get_flag(country)
    save_flag(image,country.lower() +'.png')
    return country

def download_many(c_list):
    workers = len(countries)
    with futures.ThreadPoolExecutor(workers) as executor:
        # For storing each future
        temp = []
        for c in sorted(countries):
            future = executor.submit(download_one,c)
            temp.append(future)
            print(f'{c}:{future}')

        res = []
        for future in futures.as_completed(temp):
            result = future.result()
            print(f'{future}: result {result}')
            res.append(res)

    return len(res)

if __name__== '__main__':
    main(download_many)

Limitations due to GIL (Global Interpretor Lock)

The CPython is not thread safe internally,so it has a Global Interpretor Lock(GIL),which allows only one thread at a time to execute python byte codes.That’s why a single Python process usually cannot use multiple CPU cores at the same time.

Every blocking I/O function in the Python standard library releases the GIL, allowing other threads to run. The time.sleep()function also releases the GIL. Therefore, Python threads are perfectly usable in I/O-bound applications, despite the GIL.

We can overcome this limitation by launching multiple processes using the ProcessPoolExecutor* but it will be much beneficial if it’s CPU bound job.