cython中循环的并行化
Parallelization of loop in cython
在这里的一些成员的帮助下,我确实建立了一个用Python运行的代码,并计算了一个函数,该函数以两个巨大的np.arrays
作为输入。
并行运行的矢量化版本仍然非常耗时,比用串行fortran编写的参考程序慢约50倍…
我想使用一个python循环,我可以使用OpenMP或MPI来并行化。c++中的想法是这样的:
#pragma omp parallel for
for (i=0;i<np1;i++){
for (i=0;i<np2;i++){
double dist = sph(coord1_particle1,coord1_particle2,coord2_particle1,coord2_particle2)
int bin=binning_function(dist)
hist_array[bin]++
}
}
欢迎提出任何意见。以下是Python版本:
#a is an array containing two coordinates of two objects
def dist_vec(a): # a like [[array1,array2,array2,array2],[],[]...]
return sph(a[0],a[1],a[2],a[3]) # sph operates on coordinates
def vec_chunk(array_ab, bins) :
dist = dist_vec(array_ab)
hist, _ = np.histogram(dist, bins=bins)
return hist
def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
def worker(array_ab, out_q):
""" push result in queue """
outdict = vec_chunk(array_ab, bins)
out_q.put(outdict)
# Each process will get 'chunksize' nums and a queue to put his out
out_q = mp.Queue()
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_size_a=len(array_a)-(len(array_a)%d)
array_size_b=len(array_b)-(len(array_b)%d)
a_chunk = array_size_a / d
b_chunk = array_size_b / d
procs = []
'''prepare arrays for mp'''
array_ab = np.empty((4, a_chunk, b_chunk))
for j in xrange(d):
for k in xrange(d):
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
p= mp.Process(target=worker, args=(array_ab, out_q))
p.start()
procs.append(p)
for pro in procs:
pro.join()
# Collect all results into a single result dict.
resultarray = np.empty(len(bins)-1)
for i in range(d):
resultarray+=out_q.get()
#resultdict.update(out_q.get())
return resultarray
bins = np.logspace(-3,1, num=25) #prepare x-axis for histogram
start_time = time()
hist_data = mp_dist(DATA,sim,10,bins)
print 'Total Time Elaspsed: ', time() - start_time
下面的代码比原来的代码快了6倍:它使用来自http://code.google.com/p/astrolibpy/source/browse/my_utils/quick_hist.py的更快的直方图编程代码(因为np。直方图对于均匀长度的箱子来说太慢了)新代码没有创建那么多进程,而是使用多进程。
其余的性能可以通过在cython中重写距离函数来获得。或者更好的是,用cython或scipy重写dist_vec()。Weave(参见quick_hist代码中的示例)
import numpy as np,multiprocessing as mp
from time import time
import quick_hist
def sph(a, b, c, d):
return numexpr.evaluate('log(((a - c)**2 + (b - d)**2)**.5)')
def dist_vec(a,b):
return sph(a[:,0][:, None], a[:,1][:, None], b[:,0][None, :], b[:,1][None, :])
def vec_chunk(a, b, bins) :
dist = dist_vec(a, b).flatten()
hist = quick_hist.quick_hist( (dist,), [(bins[0], bins[-1])], [len(bins)])
return hist
class si:
# singleton to share read-only data between processes
a = None
b = None
step = None
bins = None
def func(l1):
return vec_chunk(si.a[l1:l1+si.step,:], si.b, si.bins)
def mp_dist(array_a,array_b, d, bins): #d chunks
nproc = 8 # n processes
si.a = array_a
si.b = array_b
si.step = d
si.bins = bins
nx = array_a.shape[0]
lefts = np.arange(0, nx, d) #left edges of the chunks
pool = mp.Pool(nproc)
results = pool.map(func, lefts)
results = np.array(results).sum(axis=0)
pool.close()
pool.join()
return results
if __name__=='__main__':
bins = np.logspace(-3,1, num=25) #prepare x-axis for histogram
start_time = time()
n1 = 10000
n2 = 10000
DATA = np.random.uniform(size=(n1, 2))
sim = np.random.uniform(size=(n2, 2))
chunksize = 10
hist_data = mp_dist(DATA, sim, chunksize, bins)
print 'Total Time Elaspsed: ', time() - start_time
相关文章:
- 如何使用 OpenMP 正确并行化 for 循环?
- 嵌套循环 OpenMP 并行化、私有索引还是公共索引?
- 如何并行化增加循环的大小
- 在 C++ 中使用 OpenMP 并行化两个 for 循环不会提供更好的性能
- OpenMP C++:并行化 for 循环的负载不平衡
- 将 for 循环与嵌套的 while 循环并行化时出现 OpenMP 分段错误
- 如何在 OpenACC 中并行化内部具有"min"功能的循环
- 在 C 中并行化嵌套循环的几种方法之间的差异,C++使用 OpenMP
- OpenMP 不在 for 循环中的顺序函数的并行化
- "->"的循环承载依赖性阻止了并行化
- 如何并行化矩阵排序以进行循环
- 使用 c++17 算法并行化简单循环
- 是否可以使用CUDA并行化此嵌套进行循环
- 使用 CUDA 并行化四个或更多嵌套循环
- 如何将openMP的外循环与串行内循环并行化以添加数组
- 与串行相比,openMP 并行化 for 循环的速度更慢
- C++ 2011 : std::thread:并行化循环的简单示例
- OpenMP - 并行化嵌套循环
- 如何并行化循环
- 使用std::thread和良好实践并行化循环