Parallel execution with Python
With Python it is relatively easy to make programs go faster by running things in parallel on multiple cores. This article shows you how.
We sill concentrate on a type of problem that is easy to parallelize.
Problem description
Suppose you have a huge number of files and a program that has to be run on each of them. This could be e.g. a collection of holiday videos that has to be converted to a different format. Or a bunch of photos that need gamma correction.
A first attempt to solve this problem could be a shell-script that compiles a list of files and then runs the program in question them. This will work but will convert the files one at a time. All but one of the cores of your shiny multi-core machine are doing nothing and the run will take ages.
With modern multi-core machines it has become possible to make scripts that perform the same action on different files faster by using more processes.
Let’s see how this would work in Python 3.
Solutions in Python
Using multiprocessing.Pool directly
A proper way to run jobs in multiple processed in Python is to run Python code
in the worker processes of a Pool
.
The following example uses the ImageMagick
binding module wand
to
convert DICOM images to PNG format.
from multiprocessing import Pool
import os
import sys
from wand.image import Image
def convert(filename):
"""Convert a DICOM file to a PNG file, removing the blank areas from the
Philips detector.
Arguments:
filename: name of the file to convert.
Returns:
Tuple of (input filename, output filename)
"""
outname = filename.strip() + '.png'
with Image(filename=filename) as img:
with img.convert('png') as converted:
converted.units = 'pixelsperinch'
converted.resolution = (300, 300)
converted.crop(left=232, top=0, width=1568, height=2048)
converted.save(filename=outname)
return filename, outname
def main(argv):
"""Main program.
Arguments:
argv: command line arguments
"""
if len(argv) == 1:
binary = os.path.basename(argv[0])
print("{} ver. {}".format(binary, __version__), file=sys.stderr)
print("Usage: {} [file ...]\n".format(binary), file=sys.stderr)
print(__doc__)
sys.exit(0)
del argv[0] # Remove the name of the script from the arguments.
es = 'Finished conversion of {} to {}'
p = Pool()
# imap_unordered yields results in the sequence they finish, not in
# the sequence they're started.
for infn, outfn in p.imap_unordered(convert, argv):
print(es.format(infn, outfn))
if __name__ == '__main__':
main(sys.argv)
Since the work is done in separate processes, the Python GIL is not
a factor. This is a good example of using a multiprocessing.Pool
.
Using multiprocessing.Pool with subprocess
A simple solution to launch programs would be to use a combination of the
subprocess
and multiprocessing
modules. We compile a list of files to
be worked on and define a function that uses subprocess.call
to execute an
external utility on a file. This function is used in the imap_unordered
method of a multiprocessing.Pool
.
In [3]: data = """chkmem.py mkhistory.py property.py softwareinfo.py verbruik.py
markphotos.py passphrase.py pull-git.py texspell.py weer-ehv.py"""
In [4]: files = data.split()
In [5]: import subprocess
In [6]: def runwc(path):
rv = subprocess.check_output(['wc', path], universal_newlines=True)
lines, words, characters, name = rv.split()
return (name, int(lines), int(words), int(characters))
...:
In [7]: from multiprocessing import Pool
In [8]: p = Pool()
In [9]: p.map(runwc, files)
Out[9]:
[('chkmem.py', 33, 112, 992),
('mkhistory.py', 70, 246, 1950),
('property.py', 87, 373, 2895),
('softwareinfo.py', 84, 280, 2517),
('verbruik.py', 132, 585, 4476),
('markphotos.py', 97, 330, 3579),
('passphrase.py', 51, 191, 1916),
('pull-git.py', 82, 237, 2217),
('texspell.py', 343, 1480, 20631),
('weer-ehv.py', 71, 254, 2403)]
This method works and is relatively easy to implement. But it is somewhat
wasteful. The Pool
starts a number of Python worker processes. Each of those
receives items from the files
list by IPC, runs the runwc
function on
those files which starts a process to run wc
. The output of each wc
is
converted to a tuple, pickled and sent back to the parent process. If large
amounts of data have to be transferred to and from the worker processes,
performance will suffer.
Managing subprocesses directly
A different approach to using multiple subprocesses is it start a bunch of subprocesses directly. We must do this with care, though. Blindly starting a process for every file in a long list could easily exhaust available memory and processing resources. In general it does not make sense to have more concurrent processes running than you CPU has cores. That would lead to processes fighting over available cores.
The following example defines the following function to start ffmpeg
to
convert a video file to Theora/Vorbis format. It returns a Popen
object
for each started subprocess.
def startencoder(iname, oname, offs=None):
args = ['ffmpeg']
if offs is not None and offs > 0:
args += ['-ss', str(offs)]
args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
'libvorbis', '-q:a', '3', '-sn', oname]
with open(os.devnull, 'w') as bb:
p = subprocess.Popen(args, stdout=bb, stderr=bb)
return p
In the main program, a list of Popen
objects representing running
subprocesses is maintained like this.
outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
while len(procs) == maxprocs:
manageprocs(procs)
ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
procs.append(startencoder(ifile, ogvname, offset))
ogvlist.append(ogvname)
while len(procs) > 0:
manageprocs(procs)
So a new process is only started when there are less running subprocesses than
cores. Code that is used multiple times is separated into the manageprocs
function.
def manageprocs(proclist):
for pr in proclist:
if pr.poll() is not None:
proclist.remove(pr)
sleep(0.5)
This approach avoids starting unneeded processes and threads. But it is more complicated to implement.
Using ThreadPoolExecutor.map
Since Python 3.2, the concurrent.futures
module has made parallelizing
a bunch of subprocesses easier. We can use a ThreadPoolExecutor
to combine
the convenience of a map
method without the process overhead of
a multiprocessing.Pool
.
The main part of such a program looks like this.
errmsg = 'conversion of track {} failed, return code {}'
okmsg = 'finished track {}, "{}"'
num = len(data['tracks'])
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
for idx, rv in tp.map(partial(runflac, data=data), range(num)):
if rv == 0:
logging.info(okmsg.format(idx+1, data['tracks'][idx]))
else:
logging.error(errmsg.format(idx+1, rv))
This programs calls the flac
program to convert WAV music to FLAC format.
def runflac(idx, data):
"""Use the flac(1) program to convert a music file to FLAC format.
Arguments:
idx: track index (starts from 0)
data: album data dictionary
Returns:
A tuple containing the track index and return value of flac.
"""
num = idx + 1
ifn = 'track{:02d}.cdda.wav'.format(num)
args = ['flac', '--best', '--totally-silent',
'-TARTIST=' + data['artist'], '-TALBUM=' + data['title'],
'-TTITLE=' + data['tracks'][idx],
'-TDATE={}'.format(data['year']),
'-TGENRE={}'.format(data['genre']),
'-TTRACKNUM={:02d}'.format(num), '-o',
'track{:02d}.flac'.format(num), ifn]
rv = subprocess.call(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return (idx, rv)
A property of the map
method is that it returns the results in the same
order as the inputs. If it is to be expected that some subprocess calls take
significantly longer than others it would be nicer to handle the returns from
the subprocesses as they finish.
Using Futures
To handle subprocesses as they are completed we create a list of Future
objects, and use the as_completed
function. This returns futures in the
sequence that they finish.
starter = partial(runencoder, vq=args.videoquality,
aq=args.audioquality)
errmsg = 'conversion of {} failed, return code {}'
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
fl = [tp.submit(starter, t) for t in args.files]
for fut in concurrent.futures.as_completed(fl):
fn, rv = fut.result()
if rv == 0:
logging.info('finished "{}"'.format(fn))
elif rv < 0:
ls = 'file "{}" has unknown extension, ignoring it.'
logging.warning(ls.format(fname))
else:
logging.error(errmsg.format(fn, rv))
The runencoder
function is given below.
def runencoder(fname, vq, aq):
"""
Use ffmpeg to convert a video file to Theora/Vorbis streams in a Matroska
container.
Arguments:
fname: Name of the file to convert.
vq : Video quality. See ffmpeg docs.
aq: Audio quality. See ffmpeg docs.
Returns:
(fname, return value)
"""
basename, ext = os.path.splitext(fname)
known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv',
'.mkv', '.webm']
if ext.lower() not in known:
return (fname, -1)
ofn = basename + '.mkv'
args = ['ffmpeg', '-i', fname, '-c:v', 'libtheora', '-q:v', str(vq),
'-c:a', 'libvorbis', '-q:a', str(aq), '-sn', '-y', ofn]
rv = subprocess.call(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return fname, rv
This is slightly more complicated that using a map
, but will not be held
up by a single subprocess.
For comments, please send me an e-mail.
Related articles
- Profiling Python scripts(6): auto-orient
- Profiling with pyinstrument
- From python script to executable with cython
- On Python speed
- Python 3.11 speed comparison with 3.9