Roland's homepage

My random knot in the Web

Parallel execution with Python

With Python it is relatively easy to make programs go faster by running things in parallel on multiple cores. This article shows you how.

We sill concentrate on a type of problem that is easy to parallelize.

Problem description

Suppose you have a huge number of files and a program that has to be run on each of them. This could be e.g. a collection of holiday videos that has to be converted to a different format. Or a bunch of photos that need gamma correction.

A first attempt to solve this problem could be a shell-script that compiles a list of files and then runs the program in question them. This will work but will convert the files one at a time. All but one of the cores of your shiny multi-core machine are doing nothing and the run will take ages.

With modern multi-core machines it has become possible to make scripts that perform the same action on different files faster by using more processes.

Let’s see how this would work in Python 3.

Solutions in Python

Using multiprocessing.Pool directly

A proper way to run jobs in multiple processed in Python is to run Python code in the worker processes of a Pool. The following example uses the ImageMagick binding module wand to convert DICOM images to PNG format.

from multiprocessing import Pool
import os
import sys
from wand.image import Image


def convert(filename):
    """Convert a DICOM file to a PNG file, removing the blank areas from the
    Philips detector.

    Arguments:
        filename: name of the file to convert.

    Returns:
        Tuple of (input filename, output filename)
    """
    outname = filename.strip() + '.png'
    with Image(filename=filename) as img:
        with img.convert('png') as converted:
            converted.units = 'pixelsperinch'
            converted.resolution = (300, 300)
            converted.crop(left=232, top=0, width=1568, height=2048)
            converted.save(filename=outname)
    return filename, outname


def main(argv):
    """Main program.

    Arguments:
        argv: command line arguments
    """
    if len(argv) == 1:
        binary = os.path.basename(argv[0])
        print("{} ver. {}".format(binary, __version__), file=sys.stderr)
        print("Usage: {} [file ...]\n".format(binary), file=sys.stderr)
        print(__doc__)
        sys.exit(0)
    del argv[0]  # Remove the name of the script from the arguments.
    es = 'Finished conversion of {} to {}'
    p = Pool()
    for infn, outfn in p.imap_unordered(convert, argv):
        print(es.format(infn, outfn))

if __name__ == '__main__':
    main(sys.argv)

Since the work is done in separate processes, the Python GIL is not a factor. This is a good example of using a multiprocessing.Pool.

Using multiprocessing.Pool with subprocess

A simple solution to launch programs would be to use a combination of the subprocess and multiprocessing modules. We compile a list of files to be worked on and define a function that uses subprocess.call to execute an external utility on a file. This function is used in the imap_unordered method of a multiprocessing.Pool.

In [3]: data = """chkmem.py mkhistory.py property.py softwareinfo.py verbruik.py
markphotos.py passphrase.py pull-git.py texspell.py weer-ehv.py"""

In [4]: files = data.split()

In [5]: import subprocess

In [6]: def runwc(path):
    rv = subprocess.check_output(['wc', path], universal_newlines=True)
    lines, words, characters, name = rv.split()
    return (name, int(lines), int(words), int(characters))
...:

In [7]: from multiprocessing import Pool

In [8]: p = Pool()

In [9]: p.map(runwc, files)
Out[9]:
[('chkmem.py', 33, 112, 992),
('mkhistory.py', 70, 246, 1950),
('property.py', 87, 373, 2895),
('softwareinfo.py', 84, 280, 2517),
('verbruik.py', 132, 585, 4476),
('markphotos.py', 97, 330, 3579),
('passphrase.py', 51, 191, 1916),
('pull-git.py', 82, 237, 2217),
('texspell.py', 343, 1480, 20631),
('weer-ehv.py', 71, 254, 2403)]

This method works and is relatively easy to implement. But it is somewhat wasteful. The Pool starts a number of Python worker processes. Each of those receives items from the files list by IPC, runs the runwc function on those files which starts a process to run wc. The output of each wc is converted to a tuple, pickled and sent back to the parent process. If large amounts of data have to be transferred to and from the worker processes, performance will suffer.

Managing subprocesses directly

A different approach to using multiple subprocesses is it start a bunch of subprocesses directly. We must do this with care, though. Blindly starting a process for every file in a long list could easily exhaust available memory and processing resources. In general it does not make sense to have more concurrent processes running than you CPU has cores. That would lead to processes fighting over available cores.

The following example defines the following function to start ffmpeg to convert a video file to Theora/Vorbis format. It returns a Popen object for each started subprocess.

def startencoder(iname, oname, offs=None):
    args = ['ffmpeg']
    if offs is not None and offs > 0:
        args += ['-ss', str(offs)]
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
            'libvorbis', '-q:a', '3', '-sn', oname]
    with open(os.devnull, 'w') as bb:
        p = subprocess.Popen(args, stdout=bb, stderr=bb)
    return p

In the main program, a list of Popen objects representing running subprocesses is maintained like this.

outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
    while len(procs) == maxprocs:
        manageprocs(procs)
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
    procs.append(startencoder(ifile, ogvname, offset))
    ogvlist.append(ogvname)
while len(procs) > 0:
    manageprocs(procs)

So a new process is only started when there are less running subprocesses than cores. Code that is used multiple times is separated into the manageprocs function.

def manageprocs(proclist):
    for pr in proclist:
        if pr.poll() is not None:
            proclist.remove(pr)
    sleep(0.5)

This approach avoids starting unneeded processes. But it is more complicated to implement.

Using ThreadPoolExecutor.map

Since Python 3.2, the concurrent.futures module has made parallelizing a bunch of subprocesses easier. We can use a ThreadPoolExecutor to combine the convenience of a map method without the process overhead of a multiprocessing.Pool.

The main part of such a program looks like this.

errmsg = 'conversion of track {} failed, return code {}'
okmsg = 'finished track {}, "{}"'
num = len(data['tracks'])
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
    for idx, rv in tp.map(partial(runflac, data=data), range(num)):
        if rv == 0:
            logging.info(okmsg.format(idx+1, data['tracks'][idx]))
        else:
            logging.error(errmsg.format(idx+1, rv))

This programs calls the flac program to convert WAV music to FLAC format.

def runflac(idx, data):
    """Use the flac(1) program to convert a music file to FLAC format.

    Arguments:
        idx: track index (starts from 0)
        data: album data dictionary

    Returns:
        A tuple containing the track index and return value of flac.
    """
    num = idx + 1
    ifn = 'track{:02d}.cdda.wav'.format(num)
    args = ['flac', '--best', '--totally-silent',
            '-TARTIST=' + data['artist'], '-TALBUM=' + data['title'],
            '-TTITLE=' + data['tracks'][idx],
            '-TDATE={}'.format(data['year']),
            '-TGENRE={}'.format(data['genre']),
            '-TTRACKNUM={:02d}'.format(num), '-o',
            'track{:02d}.flac'.format(num), ifn]
    rv = subprocess.call(args, stdout=subprocess.DEVNULL,
                        stderr=subprocess.DEVNULL)
    return (idx, rv)

A property of the map method is that it returns the results in the same order as the inputs. If it is to be expected that some subprocess calls take significantly longer than others it would be nicer to handle the returns from the subprocesses as they finish.

Using Futures

To handle subprocesses as they are completed we create a list of Future objects, and use the as_completed function. This returns futures in the sequence that they finish.

starter = partial(runencoder, vq=args.videoquality,
                  aq=args.audioquality)
errmsg = 'conversion of {} failed, return code {}'
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
    fl = [tp.submit(starter, t) for t in args.files]
    for fut in concurrent.futures.as_completed(fl):
        fn, rv = fut.result()
        if rv == 0:
            logging.info('finished "{}"'.format(fn))
        elif rv < 0:
            ls = 'file "{}" has unknown extension, ignoring it.'
            logging.warning(ls.format(fname))
        else:
            logging.error(errmsg.format(fn, rv))

The runencoder function is given below.

def runencoder(fname, vq, aq):
    """
    Use ffmpeg to convert a video file to Theora/Vorbis streams in a Matroska
    container.

    Arguments:
        fname: Name of the file to convert.
        vq : Video quality. See ffmpeg docs.
        aq: Audio quality. See ffmpeg docs.

    Returns:
        (fname, return value)
    """
    basename, ext = os.path.splitext(fname)
    known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv',
            '.mkv', '.webm']
    if ext.lower() not in known:
        return (fname, -1)
    ofn = basename + '.mkv'
    args = ['ffmpeg', '-i', fname, '-c:v', 'libtheora', '-q:v', str(vq),
            '-c:a', 'libvorbis', '-q:a', str(aq), '-sn', '-y', ofn]
    rv = subprocess.call(args, stdout=subprocess.DEVNULL,
                        stderr=subprocess.DEVNULL)
    return fname, rv

This is slightly more complicated that using a map, but will not be held up by a single subprocess.


←  Extracting data from XML with regular expressions In programming, small and simple is beautiful  →