简介
这篇文章是这系列文章的第二篇,我们的关注点在使用Numpy API为Python编写C扩展模块的过程。在第一部分中,我们建立了一个简单的N体模拟,并发现其瓶颈是计算体之间的相互作用力,这是一个复杂度为O(N^2)的操作。通过在C语言中实现一个时间演化函数,我们大概能以大约70倍来加速计算。
如果你还没有看过第一篇文章,你应该在继续看这篇文章之前先看一下。
在这篇文章中,我们将牺牲我们代码中的一些通用性来提升性能。
回顾
Wrold是存储N体状态的一个类。我们的模拟将演化一系列时间步长下的状态。
class World(object): """World is a structure that holds the state of N bodies and additional variables. threads : (int) The number of threads to use for multithreaded implementations. dt : (float) The time-step. STATE OF THE WORLD: N : (int) The number of bodies in the simulation. m : (1D ndarray) The mass of each body. r : (2D ndarray) The position of each body. v : (2D ndarray) The velocity of each body. F : (2D ndarray) The force on each body. TEMPORARY VARIABLES: Ft : (3D ndarray) A 2D force array for each thread's local storage. s : (2D ndarray) The vectors from one body to all others. s3 : (1D ndarray) The norm of each s vector. NOTE: Ft is used by parallel algorithms for thread-local storage. s and s3 are only used by the Python implementation. """ def __init__(self, N, threads=1, m_min=1, m_max=30.0, r_max=50.0, v_max=4.0, dt=1e-3): self.threads = threads self.N = N self.m = np.random.uniform(m_min, m_max, N) self.r = np.random.uniform(-r_max, r_max, (N, 2)) self.v = np.random.uniform(-v_max, v_max, (N, 2)) self.F = np.zeros_like(self.r) self.Ft = np.zeros((threads, N, 2)) self.s = np.zeros_like(self.r) self.s3 = np.zeros_like(self.m) self.dt = dt
|
在开始模拟时,N体被随机分配质量m,位置r和速度v。对于每个时间步长,接下来的计算有:
1. 合力F,每个体上的合力根据所有其他体的计算。
2. 速度v,由于力的作用每个体的速度被改变。
3. 位置R,由于速度每个体的位置被改变。
访问宏
我们在第一部分创建的扩展模块使用宏来获取C语言中NumPy数组的元素。下面是这些宏中的一些宏的形式:
#define r(x0, x1) (*(npy_float64*)((PyArray_DATA(py_r) + \ (x0) * PyArray_STRIDES(py_r)[0] + \ (x1) * PyArray_STRIDES(py_r)[1])))
|
像这样使用宏,我们能使用像r(i, j)这样的简单标记来访问py_r数组中的元素。不管数组已经以某种形式被重塑或切片,索引值将匹配你在Python中看到的形式。
对于通用的代码,这就是我们想要的。在我们模拟的情况下,我们知道我们的数组的特点:它们是连续的,并且从未被切片或重塑。我们可以利用这一点来简化和提升我们代码的性能。
简单的C扩展 2
在本节中,我们将看到一个修改版本的C扩展,它摈弃了访问宏和NumPy数组底层数据的直接操作。本节中的代码src/simple2.c可在github上获得。
为了进行比较,之前的实现也可在这里获得。
演化函数
从文件的底部开始,我们可以看到,evolve函数与之前的版本一样,以相同的方式解析Python参数,但现在我们利用PyArray_DATA宏来获得一个纸箱底层的内存。我们将这个指针命名为npy_float64,作为double的一个别名。
static PyObject *evolve(PyObject *self, PyObject *args) { // Declare variables. npy_int64 N, threads, steps, step, i, xi, yi; npy_float64 dt; PyArrayObject *py_m, *py_r, *py_v, *py_F; npy_float64 *m, *r, *v, *F; // Parse arguments. if (!PyArg_ParseTuple(args, "ldllO!O!O!O!", &threads, &dt, &steps, &N, &PyArray_Type, &py_m, &PyArray_Type, &py_r, &PyArray_Type, &py_v, &PyArray_Type, &py_F)) { return NULL; } // Get underlying arrays from numpy arrays. m = (npy_float64*)PyArray_DATA(py_m); r = (npy_float64*)PyArray_DATA(py_r); v = (npy_float64*)PyArray_DATA(py_v); F = (npy_float64*)PyArray_DATA(py_F); // Evolve the world. for(step = 0; step < steps; ++step) { compute_F(N, m, r, F); for(i = 0; i < N; ++i) { // Compute offsets into arrays. xi = 2 * i; yi = xi + 1; v[xi] += F[xi] * dt / m[i]; v[yi] += F[yi] * dt / m[i]; r[xi] += v[xi] * dt; r[yi] += v[yi] * dt; } } Py_RETURN_NONE; }
|
从我们以前使用宏的实现来看,唯一的变化是我们必须明确地计算数组索引。我们可以将底层数组描述为二维`npy_float64`矩阵,但这需要一定的前期成本和内存管理。
计算力
与之前的实现一样,力以相同的方式进行计算。唯一的不同在符号,以及在for-loops循环中显式索引的计算。
static inline void compute_F(npy_int64 N, npy_float64 *m, npy_float64 *r, npy_float64 *F) { npy_int64 i, j, xi, yi, xj, yj; npy_float64 sx, sy, Fx, Fy, s3, tmp; // Set all forces to zero. for(i = 0; i < N; ++i) { F[2*i] = F[2*i + 1] = 0; } // Compute forces between pairs of bodies. for(i = 0; i < N; ++i) { xi = 2*i; yi = xi + 1; for(j = i + 1; j < N; ++j) { xj = 2*j; yj = xj + 1; sx = r[xj] - r[xi]; sy = r[yj] - r[yi]; s3 = sqrt(sx*sx + sy*sy); s3 *= s3*s3; tmp = m[i] * m[j] / s3; Fx = tmp * sx; Fy = tmp * sy; F[xi] += Fx; F[yi] += Fy; F[xj] -= Fx; F[yj] -= Fy; } } }
|
性能
我很惊讶地发现,相比于我们原来的C扩展,现在这种实现性能提升了45%。在相同的i5台式机上,以前的版本每秒执行17972个时间步长,而现在每秒执行26108个时间步长。这代表101倍提升了原始Python和NumPy的实现。
使用SIMD指令
在上面的实现中,我们在x和y方向上明确地计算出矢量分量。如果我们愿意放弃一些可移植性,并希望加快速度,我们可以使用x86的SIMD(单指令多数据)指令来简化我们的代码。
本节中的代码src/simd1.c,可在github上获得。
x86内部
在下面的代码中,我们将利用矢量数据类型__m128d。数字128代表矢量中的字节数,而字母“d”表示矢量分量的数据类型。在这种情况下,分量的类型是double(64字节浮点)。其他矢量的宽度和类型可根据不同的架构获得。
多种内部函数都可以使用矢量数据类型。英特尔在其网站上提供了一个方便的参考。
可移植性
我的工作环境是使用GNU gcc编译器的Debian Wheezy系统。在这种环境下,定义可用的内部数据类型和函数的头文件是x86intrin.h。这可能不能跨平台移植。
演化函数
这里的`evolve`函数比之前的版本更加紧凑。二维数组转换为__mm128d指针,并利用矢量来排除显式计算矢量分量的需要。
static PyObject *evolve(PyObject *self, PyObject *args) { // Variable declarations. npy_int64 N, threads, steps, step, i; npy_float64 dt; PyArrayObject *py_m, *py_r, *py_v, *py_F; npy_float64 *m; __m128d *r, *v, *F; // Parse arguments. if (!PyArg_ParseTuple(args, "ldllO!O!O!O!", &threads, &dt, &steps, &N, &PyArray_Type, &py_m, &PyArray_Type, &py_r, &PyArray_Type, &py_v, &PyArray_Type, &py_F)) { return NULL; } // Get underlying arrays from numpy arrays. m = (npy_float64*)PyArray_DATA(py_m); r = (__m128d*)PyArray_DATA(py_r); v = (__m128d*)PyArray_DATA(py_v); F = (__m128d*)PyArray_DATA(py_F); // Evolve the world. for(step = 0; step < steps; ++step) { compute_F(N, m, r, F); for(i = 0; i < N; ++i) { v[i] += F[i] * dt / m[i]; r[i] += v[i] * dt; } } Py_RETURN_NONE; }
|
计算力
这里力的计算也比之前的实现更加紧凑。
static inline void compute_F(npy_int64 N, npy_float64 *m, __m128d *r, __m128d *F) { npy_int64 i, j; __m128d s, s2, tmp; npy_float64 s3; // Set all forces to zero. for(i = 0; i < N; ++i) { F[i] = _mm_set1_pd(0); } // Compute forces between pairs of bodies. for(i = 0; i < N; ++i) { for(j = i + 1; j < N; ++j) { s = r[j] - r[i]; s2 = s * s; s3 = sqrt(s2[0] + s2[1]); s3 *= s3 * s3; tmp = s * m[i] * m[j] / s3; F[i] += tmp; F[j] -= tmp; } } }
|
性能
虽然这个明确的向量化代码更清晰,并比以前的版本更加紧凑??,它的运行速度只提升了约1%,它每秒执行26427个时间步长。这可能是因为编译器能够使用相同的矢量指令优化之前的执行情况。
结论
如果全通用性是没有必要的,经常的性能提升,可以通过直接用C语言访问NumPy数组的底层内存来实现。
而在现在这个实例中,显式使用矢量内部没有提供多少好处,相对性能差异将在使用OpenMP的多芯设置中增大。
|