Parallelism has many different paradigm, including:
When we start 50 processes and let them all reduce the number of 100 to 50, you will find that it is very slow! Issues of efficiency:
Instead of start the process endlessly, design a process pool to reuse the process to improve the efficiency by saving the time for developing processes and memory space and the time for destroying processes.
type | definition | Application | function |
---|---|---|---|
synchronous | execute a task after the previous task is completed | - safe for tasks with dependencies, but maybe long wait time |
ipyparallel.map_sync() |
asynchronous | execute a task regardless the status of previous task | - unsafe for tasks with dependencies - fast for independent task. |
ipyparallel.map_async() |
Python's multithreading cannot take advantage of multiple cores of a cpu:
Package | Pros | Cons |
---|---|---|
ipyparallel | ||
multiprocessing | ||
multithreading |
Trading off on computational complexity between time and memory:
import os, time
import numpy as np
from random import sample
# define function
def HelloWorld(x = None):
if x is not None:
return 'Hello '+ str(x)
else:
return 'Hello World'
%%time
res = HelloWorld()
CPU times: user 4 µs, sys: 14 µs, total: 18 µs Wall time: 7.15 µs
%%time
for i in np.arange(0,5):
res = HelloWorld(i)
CPU times: user 181 µs, sys: 80 µs, total: 261 µs Wall time: 34.8 µs
# make dataset
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=10,
n_informative=10, n_redundant=0,
random_state=0, shuffle=False)
# define function
from sklearn.ensemble import RandomForestClassifier
def clf_fit_predict(i, X, y):
clf = RandomForestClassifier(max_depth=10, random_state=0)
clf = clf.fit(X, y)
return i, clf.predict(X)
%%time
res = clf_fit_predict(0, X, y)
# %%timeit 1.68 s ± 12.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
CPU times: user 1.45 s, sys: 7.09 ms, total: 1.46 s Wall time: 1.46 s
%%time
for i in np.arange(0, 15):
res = clf_fit_predict(0, X, y)
# %%timeit 29.5 s ± 548 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
CPU times: user 21.5 s, sys: 69.5 ms, total: 21.6 s Wall time: 21.6 s
# installation
pip install notebook ipyparallel
ipcluster nbextension enable
ipython profile create --parallel --profile=myprofile
import ipyparallel as ipp
# setup for cluster profile
cluster = ipp.Cluster(n=5)
# starting engine by connecting a client with showing progress bar
rc = cluster.start_and_connect_sync()
# open controller
view = rc.direct_view()
# submit task to cluster and execute
result = view.apply_async(os.getpid)
# wait function execution with showing progress bar
result.wait_interactive()
# collect result
result = result.get_dict()
rc.shutdown(hub=True)
print(f'\n result: {result} \n')
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' Starting 5 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
result: {0: 30853, 1: 30854, 2: 30855, 3: 30856, 4: 30857}
The IPython architecture consists of four components:
Client
class for connecting to a cluster.The Parallel processing in ipyparallel will walk throught the following steps:
The use of context managers is encouraged if the cluster is not longer needed after use. In ipyparallel, the Cluster
and Client
classes can be used as context managers. The context managers help to:
# Cluster as Context manager
with ipp.Cluster(n=5) as rc: # setup for cluster profile, starting engine by connecting a client with showing progress bar
# open controller
dview = rc.direct_view()
# submit task to cluster and execute
result = dview.apply_async(os.getpid)
# wait function execution with showing progress bar
result.wait_interactive()
# collect result
result = result.get_dict()
# at this point, the cluster processes have been shutdown
time.sleep(0.5)
print(f'\n result: {result} \n')
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' engine set stopped 1660251579: {'engines': {'1': {'exit_code': 0, 'pid': 30854, 'identifier': '1'}, '4': {'exit_code': 0, 'pid': 30857, 'identifier': '4'}, '3': {'exit_code': 0, 'pid': 30856, 'identifier': '3'}, '0': {'exit_code': 0, 'pid': 30853, 'identifier': '0'}, '2': {'exit_code': 0, 'pid': 30855, 'identifier': '2'}}, 'exit_code': 0} Starting 5 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'> Controller stopped: {'exit_code': None, 'pid': 30841, 'identifier': 'ipcontroller-1660251578-we2o-30830'}
Stopping engine(s): 1660251586 engine set stopped 1660251586: {'engines': {'0': {'exit_code': 0, 'pid': 30897, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 30898, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 30899, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 30900, 'identifier': '3'}, '4': {'exit_code': 0, 'pid': 30901, 'identifier': '4'}}, 'exit_code': 0} Stopping controller Controller stopped: {'exit_code': 0, 'pid': 30885, 'identifier': 'ipcontroller-1660251585-rob1-30830'} result: {0: 30897, 1: 30898, 2: 30899, 3: 30900, 4: 30901}
Controller: a collection of processes that provides an interface . The two primary models for interacting with engines are:
Controller model | Definition | Use case |
---|---|---|
DirectView class |
Engines are addressed explicitly | 1) import package 2) upload function 3) The number of tasks and engines are equal. |
LoadBalancedView class |
Scheduler is entrusted with assigning work to appropriate engines in a destination-agnostic manner. |
1) run function 2) The number of tasks is more than the number of engines |
ipp.Client().direct_view()
: A Direct interface, where engines are addressed explicitlydview = rc[:]
: use all engineswith ipp.Cluster(n=5) as rc:
# upload funtion
dview = rc.direct_view()
dview.push({'HelloWorld': HelloWorld})
result = dview.apply_async(HelloWorld)
result.wait_interactive()
result = result.get()
time.sleep(0.5)
print(f'\n result: {result} \n')
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' Starting 5 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1660251595 engine set stopped 1660251595: {'engines': {'0': {'exit_code': 0, 'pid': 30940, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 30941, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 30942, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 30943, 'identifier': '3'}, '4': {'exit_code': 0, 'pid': 30944, 'identifier': '4'}}, 'exit_code': 0} Stopping controller Controller stopped: {'exit_code': 0, 'pid': 30928, 'identifier': 'ipcontroller-1660251594-auc2-30830'} result: ['Hello World', 'Hello World', 'Hello World', 'Hello World', 'Hello World']
load_balanced_view()
: A LoadBalanced interface, where the Scheduler is entrusted with assigning work to appropriate engineswith ipp.Cluster(n=5) as rc:
# upload funtion
dview = rc.direct_view()
dview.push({'HelloWorld': HelloWorld})
lview = rc.load_balanced_view()
result = lview.map_async(HelloWorld, np.arange(0,10))
result.wait_interactive()
result = result.get()
time.sleep(0.5)
print(f'\n result: {result} \n')
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' Starting 5 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1660251604 engine set stopped 1660251604: {'engines': {'0': {'exit_code': 0, 'pid': 30983, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 30984, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 30985, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 30986, 'identifier': '3'}, '4': {'exit_code': 0, 'pid': 30987, 'identifier': '4'}}, 'exit_code': 0} Stopping controller Controller stopped: {'exit_code': 0, 'pid': 30971, 'identifier': 'ipcontroller-1660251603-x4xj-30830'} result: ['Hello 0', 'Hello 1', 'Hello 2', 'Hello 3', 'Hello 4', 'Hello 5', 'Hello 6', 'Hello 7', 'Hello 8', 'Hello 9']
View
class¶DirectView.execute()
: import package with stringDirectView.push()
: upload function with dictView.apply()
View.apply_async()
View.apply_sync()
View.map_async()
View.map_sync()
with ipp.Cluster(n=5) as rc:
dview = rc.direct_view()
result1 = dview.apply_async(HelloWorld) # each engines run function without input arguments
result2 = dview.apply_async(HelloWorld, np.arange(0,10)) # each engines run function with the entire object
result3 = dview.map_async(HelloWorld, np.arange(0,10)) # the object are slided into single element and feed into the function
result1.wait_interactive()
result2.wait_interactive()
result3.wait_interactive()
result1 = result1.get()
result2 = result2.get()
result3 = result3.get()
time.sleep(0.5)
print(f'\n result1: {result1} \n')
print(f'\n result2: {result2} \n')
print(f'\n result3: {result3} \n')
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' Starting 5 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1660251612 engine set stopped 1660251612: {'engines': {'0': {'exit_code': 0, 'pid': 31027, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 31028, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 31029, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 31030, 'identifier': '3'}, '4': {'exit_code': 0, 'pid': 31031, 'identifier': '4'}}, 'exit_code': 0} Stopping controller Controller stopped: {'exit_code': 0, 'pid': 31015, 'identifier': 'ipcontroller-1660251611-dopl-30830'} result1: ['Hello World', 'Hello World', 'Hello World', 'Hello World', 'Hello World'] result2: ['Hello [0 1 2 3 4 5 6 7 8 9]', 'Hello [0 1 2 3 4 5 6 7 8 9]', 'Hello [0 1 2 3 4 5 6 7 8 9]', 'Hello [0 1 2 3 4 5 6 7 8 9]', 'Hello [0 1 2 3 4 5 6 7 8 9]'] result3: ['Hello 0', 'Hello 1', 'Hello 2', 'Hello 3', 'Hello 4', 'Hello 5', 'Hello 6', 'Hello 7', 'Hello 8', 'Hello 9']
%%time
with ipp.Cluster(n=15) as rc:
dview = rc.direct_view()
# upload package
dview.execute("from sklearn.datasets import make_classification")
dview.execute("from sklearn.ensemble import RandomForestClassifier")
# upload function
dview.push({'clf_fit_predict': clf_fit_predict})
time.sleep(1) # give some time to upload
# submit task and execute
result = dview.apply_async(clf_fit_predict, i = 0, X = X, y = y)
result.wait_interactive()
result = result.get()
time.sleep(0.5)
print(f'\n result: {result} \n')
# %%timeit 14.6 s ± 250 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' Starting 15 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1660251621 engine set stopped 1660251621: {'engines': {'0': {'exit_code': 0, 'pid': 31070, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 31071, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 31072, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 31073, 'identifier': '3'}, '4': {'exit_code': 0, 'pid': 31074, 'identifier': '4'}, '5': {'exit_code': 0, 'pid': 31075, 'identifier': '5'}, '6': {'exit_code': 0, 'pid': 31079, 'identifier': '6'}, '7': {'exit_code': 0, 'pid': 31084, 'identifier': '7'}, '8': {'exit_code': 0, 'pid': 31087, 'identifier': '8'}, '9': {'exit_code': 0, 'pid': 31091, 'identifier': '9'}, '10': {'exit_code': 0, 'pid': 31097, 'identifier': '10'}, '11': {'exit_code': 0, 'pid': 31102, 'identifier': '11'}, '12': {'exit_code': 0, 'pid': 31106, 'identifier': '12'}, '13': {'exit_code': 0, 'pid': 31110, 'identifier': '13'}, '14': {'exit_code': 0, 'pid': 31116, 'identifier': '14'}}, 'exit_code': 0} Stopping controller Controller stopped: {'exit_code': 0, 'pid': 31057, 'identifier': 'ipcontroller-1660251620-t49m-30830'} result: [(0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1])), (0, array([0, 0, 0, ..., 1, 1, 1]))] CPU times: user 378 ms, sys: 344 ms, total: 722 ms Wall time: 14.3 s
%%time
from functools import partial
my_task = partial(clf_fit_predict, X = X, y = y)
def task(i):
return my_task(i)
# launch cluster
with ipp.Cluster(n=5) as rc:
dview = rc.direct_view()
# import package
dview.execute("from sklearn.ensemble import RandomForestClassifier")
# upload function
dview.push({'clf_fit_predict': clf_fit_predict, # core func
'my_task': my_task, # core func with partial argument
'task': task}) # extra warper on core func with partial argument
time.sleep(1) # give some time to upload
# submit task and execute
lview = rc.load_balanced_view()
result = lview.map_async(task, np.arange(0,15))
result.wait_interactive()
result = result.get()
time.sleep(0.5)
print(f'\n result: {result} \n')
Using existing profile dir: '/Users/weiquanluo/.ipython/profile_default' Starting 5 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1660251635 engine set stopped 1660251635: {'engines': {'0': {'exit_code': 0, 'pid': 31176, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 31177, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 31178, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 31179, 'identifier': '3'}, '4': {'exit_code': 0, 'pid': 31180, 'identifier': '4'}}, 'exit_code': 0} Stopping controller Controller stopped: {'exit_code': 0, 'pid': 31164, 'identifier': 'ipcontroller-1660251634-dgf5-30830'} result: [(0, array([0, 0, 0, ..., 1, 1, 1])), (1, array([0, 0, 0, ..., 1, 1, 1])), (2, array([0, 0, 0, ..., 1, 1, 1])), (3, array([0, 0, 0, ..., 1, 1, 1])), (4, array([0, 0, 0, ..., 1, 1, 1])), (5, array([0, 0, 0, ..., 1, 1, 1])), (6, array([0, 0, 0, ..., 1, 1, 1])), (7, array([0, 0, 0, ..., 1, 1, 1])), (8, array([0, 0, 0, ..., 1, 1, 1])), (9, array([0, 0, 0, ..., 1, 1, 1])), (10, array([0, 0, 0, ..., 1, 1, 1])), (11, array([0, 0, 0, ..., 1, 1, 1])), (12, array([0, 0, 0, ..., 1, 1, 1])), (13, array([0, 0, 0, ..., 1, 1, 1])), (14, array([0, 0, 0, ..., 1, 1, 1]))] CPU times: user 225 ms, sys: 128 ms, total: 354 ms Wall time: 14.5 s
!python3 -m jupyter nbconvert jupyter_ParallelComputing.ipynb --to html
[NbConvertApp] Converting notebook jupyter_ParallelComputing.ipynb to html [NbConvertApp] Writing 648039 bytes to jupyter_ParallelComputing.html