Concurrency and multiprocessing
Concurrency
The main limitation to Python’s concurrent execution is the Global Interpreter Lock (GIL).
The GIL is a mutex that allows only one thread to run at a given time (per interpreter).
It is meant to patch CPython ’s memory management, which is, in fact, a non-thread-safe reference counting.
While IO-bound threads are not affected by this limitation, CPU-bound threads are.
Python 3.8 should have brought some mitigations to this problem, but in practice nothing changes for the user.
python’s standard libraries include:
- threading: thread-based concurrency
- multiprocessing: process-based parallelism
- concurrent.futures: asynchronous execution via threads or processes [not covered in this lecture]
There are also external libraries that allow parallelism (e.g., pathos)
Parallelism
Parallelism in python is mainly used at data level. In fact, being the data independent, no synchronization is usually required.
This means that you can achieve full parallelism and take advantage of all of the cores in modern machines, squeezing all of their power.
After the processing, results can be handled by the parallel process or collected in the main process and handled.
So, how do we achieve parallelism in python ?
There are a couple of ways, and the right solution depends on your problem. For instance, you can:
- Spawn new processes, each with their own interpreter. This introduces non-negligible time and memory overhead
- Offload to external code. From a python perspective, this transforms a CPU-bound task to an IO-bound one. For example, numpy and other libraries implement most algorithms in C/C++/Fortran
multiprocessing package
Native parallelism is provided by the multiprocessing package.
The main building block of the package is the Process class, which instances represent the activity that is being run in a separate process. If you’re familiar with *Java, it is quite similar to fairly to the Java’s equivalent.
Process init parameters include:
- target function to execute plus its args and kwargs
- daemon: whether the process is a daemon. They are killed when parent is closed
While the class methods include:
- run(): the default invokes the target with its parameters. Can be overridden
- start(): creates process and starts invokes the run method from there
- join(): joins the process, with an optional timeout
- close(): closes the Process instance and deallocates its resources
Example: Process init
|
|
INITED <Process(Process-19, initial)>
hello bob
STARTED <Process(Process-19, started)>
JOINED <Process(Process-19, stopped)>
CLOSED <Process(Process-19, closed)>
multiprocessing vs multiprocessing.dummy
The subpackage multiprocessing.dummy implements the same interface as the main package but is thread-based (i.e., logical concurrency but no parallelism, often used during testing).
As an example, let’s start a Process from the two packages.
|
|
<Process(Process-2, initial)>
<DummyProcess(Thread-4, initial)>
As you can see, they offer the same interface but the result is different.
Ways to start a process
Processes can be started in three ways:
- fork: forks the current python interpreter. It is available on Unix systems only, where it is the default method
- forkserver: a server process is created and will create new processes on behalf of the parent. It is available on some Unix platforms;
- spawn: a fresh python interpreter process is created. It inherits only the necessary resources to run the Process instance’s run() method. This option can be faster or slower compared to the others as you need to reload some or all of the packages from disk. It is available on Unix and Windows, where it is the default option
The preferred method can be chosen using the set_start_method(spawn_method) function available in the multiprocessing package.
Synchronization
Synchronization between processes (or threads!) is, again, similar to Java. For instance, the multiprocessing package includes:
- Locks
- Lock: non-recursive lock. Subsequent acquisition attempts will block until the lock is released; any process or thread may release it
- RLock: recursive lock. The same process or thread may acquire it again and must release it the same number of times
- Semaphores
- Semaphore: atomic counter representing the number of release() calls minus the number of acquire() calls, plus an initial value. Can be acquired if the value is > 0
- BoundedSemaphore: like a Semaphore, but the counter cannot exceed its initial value
Examples
Locks
What follows is a toy-example of acquiring a lock.
|
|
|
|
<Process(Process-20, started)> says hello!
<Process(Process-21, started)> says hello!
Semaphores
Semaphores offer the same interface but different behaviour.
|
|
<Process(Process-5, started)> says hello!
<Process(Process-6, started)> says hello!
<Process(Process-7, started)> says hello!
<Process(Process-8, started)> says hello!
Sharing objects
Python also supports sharing objects between processes (and threads).
Pipes
The most basic way is sharing using Pipe objects, although this solution is not very pythonic and more user friendly ways exist.
Pipes objects allow sending objects from one end to the other.
They may be duplex (send and receive from both sides) or not.
Note that they can get corrupted if two or more processes/threads read from or write to from the same side.
|
|
Starting the process
Waiting for a message
Received: [42, None, 'hello']
Queues
For instance, the multiprocessing package also includes various queue implementations. They all allow to define the max queue size (0 <= means infinite) and support different in-out policies.
The most used implementations are:
- Queue: FIFO queue
- LifoQueue: LIFO queue
- PriorityQueue: priority queue
|
|
|
|
0
What if we use two daemons instead?
|
|
0
1
2
3
4
5
6
7
8
9
Computation will not complete unless we join them!
Sharing state
Python supports sharing state between processes and threads.
Keep in mind that it is usually best to avoid using shared state as far as possible.
- This is particularly true when using multiple processes
There are a couple of ways of sharing state:
- Sharing memory
- Managers
Shared memory
Starting from Python 3.8, you can also share any object using the shared_memory module.
It allows to share a location of memory between processes (threads already share memory, of course) and allocate base objects there.
Very briefly, you can allocate CTypes object in a shared memory:
- Value represents a single value
- Array represents an array
Check the docs for more!
|
|
10.0
[10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
Managers
Managers provide a way to create data which can be shared between different processes.
They control a server process holding the objects and allows other processes to manipulate them using proxies.
including sharing over a network between processes running on different machines.
For instance, the SyncManager, returned by Manager(), supports lists, dictionaries, locks, semaphores, queues, shared memory objects and others.
|
|
Pools
The most common, but also simple and pythonic, way to perform multiprocessing in python is through pools of processes.
Pools create a number of workers which will carry out tasks submitted to the pool.
A Pool object controls a pool of workers, and supports both synchronous and asynchronous results.
Pool parameters
The main parameters of the Pool class include:
- processes: number of worker processes to use. If None, the number of CPUs is used
- initializer: if not None, each worker will call *initializer(initargs) when it starts
- maxtasksperchild: number of tasks a worker can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. Default is None, which means worker processes will live as the pool itself
Example: Pool initialization
Let’s init a Pool.
|
|
8
Pool methods
Pool objects offer both synchronous and asynchronous methods.
Synchronous methods
The synchronous methods are:
- apply(func[, args[, kwds]]): calls func with given arguments
- map(func, iterable[, chunksize]): chops the iterable parameter into chunks of (approximate) size chunksize and submits them to the pool as separate tasks
- imap(func, iterable[, chunksize]): map lazier variant. Suitable for very long iterables using a large chunksize value improves performances
- imap_unordered(func, iterable[, chunksize]): same as above, but results’ order is arbitrary
- starmap(func, iterable[, chunksize]): like map but the elements of the iterable are expected to be iterables that are unpacked as arguments
Getting results synchronously
After initing the Pool, let’s submit a job and get the result synchronously.
|
|
This result will show immediately. Result: 4
This result will take some time. Result: 4
Asynchronous method
The synchronous methods also have an asynchronous variant:
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
While the synchronous result methods block until the result is ready, the asynchronous ones return an AsyncResult object and also provide timeouts and callbacks.
The AsyncResult provides blocking get() and wait() methods to get the result, and ready() and successful() methods to check the result status. The single argument callbacks can handle the result or the exception, but must return immediately as they are executed by the main thread and block the result processing otherwise.
Getting results asynchronously
Now let’s try to get the results asynchronously.
|
|
Here's your (future) result <multiprocessing.pool.ApplyResult object at 0x7f79ca75def0>
WAIT FOR IT
3
4
RESULT IS READY!
25
Status successful? True
|
|
Timeout 1s, but function takes more time!
5
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-15-8b3ab813e71f> in <module>
2
3 print("Timeout 1s, but function takes more time!")
----> 4 print(result.get(timeout=1))
~/anaconda3/envs/pytorch/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
651 self.wait(timeout)
652 if not self.ready():
--> 653 raise TimeoutError
654 if self._success:
655 return self._value
TimeoutError:
Map iterators to workers
Let’s try to map an iterator to many workers.
|
|
|
|
These results will appear immediately
[0, 1, 4, 9]
|
|
These results will appear all at once
[0, 1, 4, 9]
|
|
These results will appear one at a time *IN THE SAME ORDER AS THE INPUT*!
0
1
4
9
|
|
These results will appear one at a time *AS SOON AS THEY ARE READY*!
0
16
25
4
49
1
81
9
36
64
Pool handling
Pools provide a few methods to handle them:
- close(): prevents any more tasks from being submitted. Once all the tasks have been completed the worker processes will exit
- terminate(): forces the workers to exit
- join(): waits for the worker processes to exit. Must be called after close() or terminate()
The simplest way to handle a pool of workers is through the with statement (Context Manager protocol).
- It automatically closes the pool once the with block is done. Behaves like a try … finally
- Generic purpose, not limited to pools
|
|
These tasks will not complete
|
|
<ForkProcess(ForkPoolWorker-44, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-44, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-43, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-42, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-45, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-44, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-45, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-44, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-43, started daemon)> DONE!
<ForkProcess(ForkPoolWorker-42, started daemon)> DONE!
But these will!
Guidelines
Here are a few guidelines from the python’s official documentation that aim to improve your code and avoid bugs.
- Avoid shared state. Stick to queues or pipes rather than using the lower level synchronization primitives
- Prefer inheritance than pickle/unpickle
- … and also be sure that your arguments are picklable (serializable)
- Lock proxies if multithreading. They are NOT thread safe!
- Explicitly pass resources to child processes for compatibility with the spawn method, which is default on Windows, instead using of global resources
- Do not terminate processes abruptly if they use shared resources
- Join processes that use queues carefully. They wait before terminating until all the buffered items are fed to the underlying pipe and joining them will cause deadlocks
Exercise
Follow up of the previous section Iterables and generators.
Now we want to process the records in the CSV file. Assume that you want to perform some very time consuming operation on them and employ a Pool of processes to perform these operations.
Note that map will unroll the generator and fit all the records into memory, which contrasts with our requirements. For a moment, forget about it and use map_async.
As a second step, reintroduce the RAM constraint and use queues and apply_async. A few tips for this second step:
- Use Queues
- This is just a producer/consumer example
|
|
|
|
|
|
Part 1: no memory constraints
|
|
Part 2: bring memory constraints back
|
|
|
|
6
7
8
9