起名Linux内核同步机制之(4):spin lock【转】

转自:http://www.wowotech.net/kernel_synchronization/spinlock.html

Avoiding Branch Divergence

突发性,控制流依赖于thread索引。同二个warp中,3个尺码分支或者引致很差的属性。通过重新组织数量得到情势能够减小或防止warp
divergence(该难题的表明请查看warp解析篇)。

一、前言

The Parallel Reduction Problem

咱俩明天要总结二个数组N个要素的和。这一个进度用CPU编制程序很不难实现:

int sum = 0;
for (int i = 0; i < N; i++)
    sum += array[i];

那便是说只要Array的因素格外多呢?应用并行计算可以大大升高这几个历程的功用。鉴于加法的交流律等质量,那么些求和进度能够以成分的即兴顺序来进展:

  • 将输入数组切割成很多小的块。
  • 用thread来计算每种块的和。
  • 对那几个块的结果再求和得最后结出。

数组的切割宗旨是,用thread求数组中按自然规律配对的的三个成分和,然后将拥有结果组合成2个新的数组,然后再一次求配对两成分和,多次迭代,直到数组中只有2个结出。

比较直观的两种达成格局是:

  1. Neighbored
    pair:每一趟迭代都以隔壁四个成分求和。
  2. Interleaved
    pair:按自然跨度配对三个成分。

下图彰显了两种艺术的求解进度,对于有N个要素的数组,那几个进度要求N-三次求和,log(N)步。Interleaved
pair的跨度是半个数CEO度。

 起名 1

下边是用递归实现的interleaved pair代码(host):

int recursiveReduce(int *data, int const size) {
    // terminate check
    if (size == 1) return data[0];
        // renew the stride
       int const stride = size / 2;
       // in-place reduction
    for (int i = 0; i < stride; i++) {
        data[i] += data[i + stride];
    }
    // call recursively
    return recursiveReduce(data, stride);
}                

上述讲的那类难点术语叫reduction
problem。Parallel
reduction(并行规约)是指迭代减弱操作,是并行算法中国和北美洲常首要的1种操作。

在linux
kernel的兑现中,平常会赶上这么的现象:共享数据被搁浅上下文和经过上下文访问,该如何爱戴呢?借使唯有经过上下文的拜会,那么能够思虑选取semaphore也许mutex的锁机制,可是以往中断上下文也参和进来,那多少个可以引致睡眠的lock就不能够使用了,那时候,能够设想使用spin
lock。本文首要介绍了linux kernel中的spin
lock的规律以及代码完毕。由于spin lock是architecture
dependent代码,因而,大家在第伍章钻探了A福特ExplorerM3二和AOdysseyM6四上的落到实处细节。

Divergence in Parallel Reduction

那壹部分以neighbored pair为参报考学士究:

 起名 2

在这几个kernel里面,有几个global memory
array,3个用来存放数组全部数据,另1个用来存放部分和。全体block独立的执行求和操作。__syncthreads(至于联合,请看前文)用来确定保证每趟迭代,全体的求和操作都做完,然后进入下一步迭代。

__global__ void reduceNeighbored(int *g_idata, int *g_odata, unsigned int n) {
    // set thread ID
    unsigned int tid = threadIdx.x;
    // convert global data pointer to the local pointer of this block
    int *idata = g_idata + blockIdx.x * blockDim.x;
    // boundary check
    if (idx >= n) return;
        // in-place reduction in global memory
    for (int stride = 1; stride < blockDim.x; stride *= 2) {
        if ((tid % (2 * stride)) == 0) {
            idata[tid] += idata[tid + stride];
        }
        // synchronize within block
        __syncthreads();
    }
    // write result for this block to global mem
    if (tid == 0) g_odata[blockIdx.x] = idata[0];
}        

因为尚未艺术让抱有的block同步,所以最终将兼具block的结果送回host来举行串行计算,如下图所示:

 起名 3

main代码: 

起名 4起名 5

int main(int argc, char **argv) {
// set up device
int dev = 0;
cudaDeviceProp deviceProp;
cudaGetDeviceProperties(&deviceProp, dev);
printf("%s starting reduction at ", argv[0]);
printf("device %d: %s ", dev, deviceProp.name);
cudaSetDevice(dev);
bool bResult = false;
// initialization
int size = 1<<24; // total number of elements to reduce
printf(" with array size %d ", size);
// execution configuration
int blocksize = 512; // initial block size
if(argc > 1) {
blocksize = atoi(argv[1]); // block size from command line argument
}
dim3 block (blocksize,1);
dim3 grid ((size+block.x-1)/block.x,1);
printf("grid %d block %d\n",grid.x, block.x);
// allocate host memory
size_t bytes = size * sizeof(int);
int *h_idata = (int *) malloc(bytes);
int *h_odata = (int *) malloc(grid.x*sizeof(int));
int *tmp = (int *) malloc(bytes);
// initialize the array
for (int i = 0; i < size; i++) {
// mask off high 2 bytes to force max number to 255
h_idata[i] = (int)(rand() & 0xFF);
}
memcpy (tmp, h_idata, bytes);
size_t iStart,iElaps;
int gpu_sum = 0;
// allocate device memory
int *d_idata = NULL;
int *d_odata = NULL;
cudaMalloc((void **) &d_idata, bytes);
cudaMalloc((void **) &d_odata, grid.x*sizeof(int));
// cpu reduction
iStart = seconds ();
int cpu_sum = recursiveReduce(tmp, size);
iElaps = seconds () - iStart;
printf("cpu reduce elapsed %d ms cpu_sum: %d\n",iElaps,cpu_sum);
// kernel 1: reduceNeighbored
cudaMemcpy(d_idata, h_idata, bytes, cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
iStart = seconds ();
warmup<<<grid, block>>>(d_idata, d_odata, size);
cudaDeviceSynchronize();
iElaps = seconds () - iStart;
cudaMemcpy(h_odata, d_odata, grid.x*sizeof(int), cudaMemcpyDeviceToHost);
gpu_sum = 0;
for (int i=0; i<grid.x; i++) gpu_sum += h_odata[i];
printf("gpu Warmup elapsed %d ms gpu_sum: %d <<<grid %d block %d>>>\n",
iElaps,gpu_sum,grid.x,block.x);
// kernel 1: reduceNeighbored
cudaMemcpy(d_idata, h_idata, bytes, cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
iStart = seconds ();
reduceNeighbored<<<grid, block>>>(d_idata, d_odata, size);
cudaDeviceSynchronize();
iElaps = seconds () - iStart;
cudaMemcpy(h_odata, d_odata, grid.x*sizeof(int), cudaMemcpyDeviceToHost);
gpu_sum = 0;
for (int i=0; i<grid.x; i++) gpu_sum += h_odata[i];
printf("gpu Neighbored elapsed %d ms gpu_sum: %d <<<grid %d block %d>>>\n",
iElaps,gpu_sum,grid.x,block.x);
cudaDeviceSynchronize();
iElaps = seconds() - iStart;
cudaMemcpy(h_odata, d_odata, grid.x/8*sizeof(int), cudaMemcpyDeviceToHost);
gpu_sum = 0;
for (int i = 0; i < grid.x / 8; i++) gpu_sum += h_odata[i];
printf("gpu Cmptnroll elapsed %d ms gpu_sum: %d <<<grid %d block %d>>>\n",
iElaps,gpu_sum,grid.x/8,block.x);
/// free host memory
free(h_idata);
free(h_odata);
// free device memory
cudaFree(d_idata);
cudaFree(d_odata);
// reset device
cudaDeviceReset();
// check the results
bResult = (gpu_sum == cpu_sum);
if(!bResult) printf("Test failed!\n");
return EXIT_SUCCESS;
}

View Code

开始化数组,使其涵盖16M成分:

int size = 1<<24;

kernel配置为1D grid和1D block:

dim3 block (blocksize, 1);
dim3 block ((siize + block.x – 1) / block.x, 1);

编译:

$ nvcc -O3 -arch=sm_20 reduceInteger.cu -o reduceInteger

运行:

$ ./reduceInteger starting reduction at device 0: Tesla M2070
with array size 16777216 grid 32768 block 512
cpu reduce elapsed 29 ms cpu_sum: 2139353471
gpu Neighbored elapsed 11 ms gpu_sum: 2139353471 <<<grid 32768 block 512>>>
Improving Divergence in Parallel Reduction

思虑上节if评定尺度:

if ((tid % (2 * stride)) == 0)

因为那表明式只对偶数ID的线程为true,所以其造成很高的divergent
warps。第一遍迭代唯有偶数ID的线程执行了命令,可是全体线程都要被调度;第二遍迭代,唯有四分之的thread是active的,可是富有thread照旧要被调度。大家能够再度组织种种线程对应的数组索引来强制ID相邻的thread来拍卖求和操作。如下图所示(注意途中的Thread
ID与上三个图的差异):

 起名 6

新的代码:

__global__ void reduceNeighboredLess (int *g_idata, int *g_odata, unsigned int n) {
    // set thread ID
    unsigned int tid = threadIdx.x;
    unsigned int idx = blockIdx.x * blockDim.x + threadIdx.x;
    // convert global data pointer to the local pointer of this block
    int *idata = g_idata + blockIdx.x*blockDim.x;
    // boundary check
    if(idx >= n) return;
    // in-place reduction in global memory
    for (int stride = 1; stride < blockDim.x; stride *= 2) {
        // convert tid into local array index
        int index = 2 * stride * tid;
        if (index < blockDim.x) {
            idata[index] += idata[index + stride];
        }    
        // synchronize within threadblock
        __syncthreads();
    }
    // write result for this block to global mem
    if (tid == 0) g_odata[blockIdx.x] = idata[0];
}                                

留神那行代码:

int index = 2 * stride * tid;

因为步调乘以了2,下边包车型大巴讲话使用block的前半有的thread来执行求和:

if (index < blockDim.x)

对此二个有511个thread的block来说,前多个warp执行第二轮reduction,剩下四个warp什么也不干;第一轮,前七个warp执行,剩下拾三个怎样也不干。因而,就彻底不设有divergence了(重申,divergence只产生于同三个warp)。最终的伍轮依旧会促成divergence,因为今年须要实践threads已经凑不够3个warp了。

// kernel 2: reduceNeighbored with less divergence
cudaMemcpy(d_idata, h_idata, bytes, cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
iStart = seconds();
reduceNeighboredLess<<<grid, block>>>(d_idata, d_odata, size);
cudaDeviceSynchronize();
iElaps = seconds() - iStart;
cudaMemcpy(h_odata, d_odata, grid.x*sizeof(int), cudaMemcpyDeviceToHost);
gpu_sum = 0;
for (int i=0; i<grid.x; i++) gpu_sum += h_odata[i];
printf("gpu Neighbored2 elapsed %d ms gpu_sum: %d <<<grid %d block %d>>>\n",iElaps,gpu_sum,grid.x,block.x);

运行结果:

$ ./reduceInteger Starting reduction at device 0: Tesla M2070
vector size 16777216 grid 32768 block 512
cpu reduce elapsed 0.029138 sec cpu_sum: 2139353471
gpu Neighbored elapsed 0.011722 sec gpu_sum: 2139353471 <<<grid 32768 block 512>>>
gpu NeighboredL elapsed 0.009321 sec gpu_sum: 2139353471 <<<grid 32768 block 512>>>

新的兑现比原先的快了壹.贰陆。大家也能够利用nvprof的inst_per_warp参数来查阅各种warp上推行的授命数指标平均值。

$ nvprof --metrics inst_per_warp ./reduceInteger

输出,原来的是新的kernel的两倍还多,因为原本的有过多不要求的操作也实施了:

Neighbored Instructions per warp 295.562500
NeighboredLess Instructions per warp 115.312500

再查看throughput:

$ nvprof --metrics gld_throughput ./reduceInteger

出口,新的kernel拥有更大的throughput,因为就算I/O操作数目相同,可是其耗费时间短:

Neighbored Global Load Throughput 67.663GB/s
NeighboredL Global Load Throughput 80.144GB/s
Reducing with Interleaved Pairs

 Interleaved
Pair情势的发轫步调是block大小的八分之四,每种thread处理像个半个block的多个数据求和。和以前的图示相比较,工作的thread数目未有转变,可是,各类thread的load/store
global memory的岗位是例外的。

Interleaved Pair的kernel实现:

/// Interleaved Pair Implementation with less divergence
__global__ void reduceInterleaved (int *g_idata, int *g_odata, unsigned int n) {
// set thread ID
unsigned int tid = threadIdx.x;
unsigned int idx = blockIdx.x * blockDim.x + threadIdx.x;
// convert global data pointer to the local pointer of this block
int *idata = g_idata + blockIdx.x * blockDim.x;
// boundary check
if(idx >= n) return;
// in-place reduction in global memory
for (int stride = blockDim.x / 2; stride > 0; stride >>= 1) {
if (tid < stride) {
idata[tid] += idata[tid + stride];
}
__syncthreads();
}
// write result for this block to global mem
if (tid == 0) g_odata[blockIdx.x] = idata[0];
}

 起名 7

留意下边包车型客车话语,步调被开端化为block大小的五成:

for (int stride = blockDim.x / 2; stride
> 0; stride >>= 1) {

上面包车型大巴言语使得第贰次迭代时,block的前半片段thread执行相加操作,第三遍是前四分一,以此类推:

if (tid < stride)

上边是投入main的代码:

cudaMemcpy(d_idata, h_idata, bytes, cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
iStart = seconds();
reduceInterleaved <<< grid, block >>> (d_idata, d_odata, size);
cudaDeviceSynchronize();
iElaps = seconds() - iStart;
cudaMemcpy(h_odata, d_odata, grid.x*sizeof(int), cudaMemcpyDeviceToHost);
gpu_sum = 0;
for (int i = 0; i < grid.x; i++) gpu_sum += h_odata[i];
printf("gpu Interleaved elapsed %f sec gpu_sum: %d <<<grid %d block %d>>>\n",iElaps,gpu_sum,grid.x,block.x);

运维输出:

$ ./reduce starting reduction at device 0: Tesla M2070
with array size 16777216 grid 32768 block 512
cpu reduce elapsed 0.029138 sec cpu_sum: 2139353471
gpu Warmup elapsed 0.011745 sec gpu_sum: 2139353471 <<<grid 32768 block 512>>>
gpu Neighbored elapsed 0.011722 sec gpu_sum: 2139353471 <<<grid 32768 block 512>>>
gpu NeighboredL elapsed 0.009321 sec gpu_sum: 2139353471 <<<grid 32768 block 512>>>
gpu Interleaved elapsed 0.006967 sec gpu_sum: 2139353471 <<<grid 32768 block 512>>>

这一次相对第一个kernel又快了一.6九,比第一个也快了1.3肆。那几个效应首要由global
memory的load/store形式导致的(那有的学问将在后续博文介绍)。

注:本文供给经过和间断处理的基本知识作为援助。

UNrolling Loops

loop unrolling
是用来优化循环收缩分支的章程,该方法简单说正是把本应在屡次loop中成就的操作,尽量压缩到壹回loop。循环体展开程度称为loop
unrolling factor(循环展开因子),loop
unrolling对各样数组的巡回操作品质有一点都不小影响,思量如下代码:

for (int i = 0; i < 100; i++) {
    a[i] = b[i] + c[i];
}

1般来说重复一次循环体操作,迭代数目将压缩八分之四:

for (int i = 0; i < 100; i += 2) {
    a[i] = b[i] + c[i];
    a[i+1] = b[i+1] + c[i+1];
}    

从高级语言层面是不能看出质量升高的案由的,要求从low-level
instruction层面去分析,第3段代码循环次数收缩了5/十,而循环体两句语句的读写操作的实施在CPU上是足以同时推行互相独立的,所以相对第3段,第三段质量要好。

Unrolling
在CUDA编制程序中意思更重。大家的对象还是是经过缩短指令执行消耗,扩张愈来愈多的独立指令来增加质量。那样就会大增越多的并行操作从而发生更加高的通令和内部存款和储蓄器带宽(bandwidth)。也就提供了愈多的eligible
warps来赞助hide instruction/memory latency 。

 

Reducing with Unrolling

在前文的reduceInterleaved中,每一个block处理局地数额,大家给这数据起名data
block。上面包车型大巴代码是reduceInterleaved的校正版本,每种block,都以以八个data
block作为源数据进行操作,(前文中,每一种block处理贰个data
block)。那是1种cyclic partitioning:各种thread效能于八个data
block,并且从各样data block中取出叁个成分处理。

__global__ void reduceUnrolling2 (int *g_idata, int *g_odata, unsigned int n) {
    // set thread ID
    unsigned int tid = threadIdx.x;
    unsigned int idx = blockIdx.x * blockDim.x * 2 + threadIdx.x;

    // convert global data pointer to the local pointer of this block
    int *idata = g_idata + blockIdx.x * blockDim.x * 2;

    // unrolling 2 data blocks
    if (idx + blockDim.x < n) g_idata[idx] += g_idata[idx + blockDim.x];
    __syncthreads();

    // in-place reduction in global memory
    for (int stride = blockDim.x / 2; stride > 0; stride >>= 1) {
        if (tid < stride) {
            idata[tid] += idata[tid + stride];
        }
        // synchronize within threadblock
        __syncthreads();
    }

    // write result for this block to global mem
    if (tid == 0) g_odata[blockIdx.x] = idata[0];
}                                                

小心上边包车型大巴言语,每一种thread从隔壁的data
block中取多少,这一步实际上就是将五个data block规约成一个。

if (idx + blockDim.x < n)
g_idata[idx] += g_idata[idx+blockDim.x];

global array
index也要相应的调动,因为,相对此前的版本,同样的数目,大家只要求原来八分之四的thread就能消除难点。要小心的是,那样做也会下滑warp或block的并行性(因为thread少啊):

 起名 8

main扩展下边代码:

cudaMemcpy(d_idata, h_idata, bytes, cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
iStart = seconds();
reduceUnrolling2 <<< grid.x/2, block >>> (d_idata, d_odata, size);
cudaDeviceSynchronize();
iElaps = seconds() - iStart;
cudaMemcpy(h_odata, d_odata, grid.x/2*sizeof(int), cudaMemcpyDeviceToHost);
gpu_sum = 0;
for (int i = 0; i < grid.x / 2; i++) gpu_sum += h_odata[i];
printf("gpu Unrolling2 elapsed %f sec gpu_sum: %d <<<grid %d block %d>>>\n",iElaps,gpu_sum,grid.x/2,block.x);

出于各样block处理七个data block,所以须要调整grid的安顿:

reduceUnrolling2<<<grid.x / 2,
block>>>(d_idata, d_odata, size);

运维输出:

gpu Unrolling2 elapsed 0.003430 sec gpu_sum: 2139353471 <<<grid 16384 block 512>>>

诸如此类二遍不难的操作就比原先的回落了三.4二。我们在摸索各类block处理四个和几个data
block的情形:

reduceUnrolling4 : each threadblock
handles 4 data blocks

reduceUnrolling8 : each threadblock
handles 8 data blocks

累加那三个的出口是:

gpu Unrolling2 elapsed 0.003430 sec gpu_sum: 2139353471 <<<grid 16384 block 512>>>
gpu Unrolling4 elapsed 0.001829 sec gpu_sum: 2139353471 <<<grid 8192 block 512>>>
gpu Unrolling8 elapsed 0.001422 sec gpu_sum: 2139353471 <<<grid 4096 block 512>>>

能够见到,同三个thread中一经能有更加多的单身的load/store操作,会时有发生越来越好的质量,因为如此做memory
latency能够更加好的被隐形。大家得以选取nvprof的dram_read_throughput来验证:

$ nvprof --metrics dram_read_throughput ./reduceInteger

下边是出口结果,大家得以汲取那样的下结论,device read
throughtput和unrolling程度是正比的:

Unrolling2 Device Memory Read Throughput 26.295GB/s
Unrolling4 Device Memory Read Throughput 49.546GB/s
Unrolling8 Device Memory Read Throughput 62.764GB/s
Reducinng with Unrolled Warps

__syncthreads是用来同步block内部thread的(请看warp解析篇)。在reduction
kernel中,他被用来在每一趟循环中年尤其保证全数thread的写global
memory的操作都已做到,那样才能实行下1阶段的计量。

那便是说,当kernel实行到只需求简单或等3三个thread(也正是二个warp)呢?由于大家是接纳的SIMT形式,warp内的thread
是有1个隐式的同步进程的。最终四次迭代能够用下边包车型客车语句展开:

if (tid < 32) {
    volatile int *vmem = idata;
    vmem[tid] += vmem[tid + 32];
    vmem[tid] += vmem[tid + 16];
    vmem[tid] += vmem[tid + 8];
    vmem[tid] += vmem[tid + 4];
    vmem[tid] += vmem[tid + 2];
    vmem[tid] += vmem[tid + 1];
}                

warp unrolling避免了__syncthreads同步操作,因为这一步本人就没须求。

此间注意下volatile修饰符,他告知编写翻译器每便执行赋值时必须将vmem[tid]的值store回global
memory。即便不这么做的话,编写翻译器或cache或许会优化我们读写global/shared
memory。有了这么些修饰符,编译器就会觉得这几个值会被别的thread修改,从而使得每一趟读写都直接去memory而不是去cache也许register。

__global__ void reduceUnrollWarps8 (int *g_idata, int *g_odata, unsigned int n) {
    // set thread ID
    unsigned int tid = threadIdx.x;
    unsigned int idx = blockIdx.x*blockDim.x*8 + threadIdx.x;

    // convert global data pointer to the local pointer of this block
    int *idata = g_idata + blockIdx.x*blockDim.x*8;

    // unrolling 8
    if (idx + 7*blockDim.x < n) {
        int a1 = g_idata[idx];
        int a2 = g_idata[idx+blockDim.x];
        int a3 = g_idata[idx+2*blockDim.x];
        int a4 = g_idata[idx+3*blockDim.x];
        int b1 = g_idata[idx+4*blockDim.x];
        int b2 = g_idata[idx+5*blockDim.x];
        int b3 = g_idata[idx+6*blockDim.x];
        int b4 = g_idata[idx+7*blockDim.x];
        g_idata[idx] = a1+a2+a3+a4+b1+b2+b3+b4;
    }
    __syncthreads();

    // in-place reduction in global memory
    for (int stride = blockDim.x / 2; stride > 32; stride >>= 1) {

        if (tid < stride) {
            idata[tid] += idata[tid + stride];
        }

        // synchronize within threadblock
        __syncthreads();
    }

    // unrolling warp
    if (tid < 32) {
        volatile int *vmem = idata;
        vmem[tid] += vmem[tid + 32];
        vmem[tid] += vmem[tid + 16];
        vmem[tid] += vmem[tid + 8];
        vmem[tid] += vmem[tid + 4];
        vmem[tid] += vmem[tid + 2];
        vmem[tid] += vmem[tid + 1];
    }

    // write result for this block to global mem
    if (tid == 0) g_odata[blockIdx.x] = idata[0];
}                                                                                                            

因为拍卖的data block变为八个,kernel调用变为;

reduceUnrollWarps8<<<grid.x / 8,
block>>> (d_idata, d_odata, size);

本次实施结果比reduceUnnrolling八快一.0伍,比reduceNeighboured快捌,六五:

gpu UnrollWarp8 elapsed 0.001355 sec gpu_sum: 2139353471 <<<grid 4096 block 512>>>

nvprof的stall_sync能够用来验证由于__syncthreads导致越来越少的warp阻塞了:

$ nvprof --metrics stall_sync ./reduce
Unrolling8 Issue Stall Reasons 58.37%
UnrollWarps8 Issue Stall Reasons 30.60%
Reducing with Complete Unrolling

即使在编写翻译时已知了迭代次数,就能够完全把循环展开。Fermi和Kepler每种block的最大thread数目都以十二四,博文中的kernel的迭代次数都是基于blockDim的,所以完全展开循环是行之有效的。

__global__ void reduceCompleteUnrollWarps8 (int *g_idata, int *g_odata,
unsigned int n) {
    // set thread ID
    unsigned int tid = threadIdx.x;
    unsigned int idx = blockIdx.x * blockDim.x * 8 + threadIdx.x;

    // convert global data pointer to the local pointer of this block
    int *idata = g_idata + blockIdx.x * blockDim.x * 8;

    // unrolling 8
    if (idx + 7*blockDim.x < n) {
        int a1 = g_idata[idx];
        int a2 = g_idata[idx + blockDim.x];
        int a3 = g_idata[idx + 2 * blockDim.x];
        int a4 = g_idata[idx + 3 * blockDim.x];
        int b1 = g_idata[idx + 4 * blockDim.x];
        int b2 = g_idata[idx + 5 * blockDim.x];
        int b3 = g_idata[idx + 6 * blockDim.x];
        int b4 = g_idata[idx + 7 * blockDim.x];
        g_idata[idx] = a1 + a2 + a3 + a4 + b1 + b2 + b3 + b4;
    }
    __syncthreads();

    // in-place reduction and complete unroll
    if (blockDim.x>=1024 && tid < 512) idata[tid] += idata[tid + 512];
    __syncthreads();

    if (blockDim.x>=512 && tid < 256) idata[tid] += idata[tid + 256];
    __syncthreads();

    if (blockDim.x>=256 && tid < 128) idata[tid] += idata[tid + 128];
    __syncthreads();

    if (blockDim.x>=128 && tid < 64) idata[tid] += idata[tid + 64];
    __syncthreads();

    // unrolling warp
    if (tid < 32) {
        volatile int *vsmem = idata;
        vsmem[tid] += vsmem[tid + 32];
        vsmem[tid] += vsmem[tid + 16];
        vsmem[tid] += vsmem[tid + 8];
        vsmem[tid] += vsmem[tid + 4];
        vsmem[tid] += vsmem[tid + 2];
        vsmem[tid] += vsmem[tid + 1];
    }

    // write result for this block to global mem
    if (tid == 0) g_odata[blockIdx.x] = idata[0];
}                

main中调用:

reduceCompleteUnrollWarps8<<<grid.x /
8, block>>>(d_idata, d_odata, size);

速度再度提高:

gpu CmptUnroll8 elapsed 0.001280 sec gpu_sum: 2139353471 <<<grid 4096 block 512>>>

二、工作规律

Reducing with Templete Functions

CUDA代码匡助模板,大家能够如下设置block大小:

template <unsigned int iBlockSize>
__global__ void reduceCompleteUnroll(int *g_idata, int *g_odata, unsigned int n) {
// set thread ID
unsigned int tid = threadIdx.x;
unsigned int idx = blockIdx.x * blockDim.x * 8 + threadIdx.x;

// convert global data pointer to the local pointer of this block
int *idata = g_idata + blockIdx.x * blockDim.x * 8;

// unrolling 8
if (idx + 7*blockDim.x < n) {
int a1 = g_idata[idx];
int a2 = g_idata[idx + blockDim.x];
int a3 = g_idata[idx + 2 * blockDim.x];
int a4 = g_idata[idx + 3 * blockDim.x];
int b1 = g_idata[idx + 4 * blockDim.x];
int b2 = g_idata[idx + 5 * blockDim.x];
int b3 = g_idata[idx + 6 * blockDim.x];
int b4 = g_idata[idx + 7 * blockDim.x];
g_idata[idx] = a1+a2+a3+a4+b1+b2+b3+b4;
}
__syncthreads();

// in-place reduction and complete unroll
if (iBlockSize>=1024 && tid < 512) idata[tid] += idata[tid + 512];
__syncthreads();

if (iBlockSize>=512 && tid < 256) idata[tid] += idata[tid + 256];
__syncthreads();

if (iBlockSize>=256 && tid < 128) idata[tid] += idata[tid + 128];
__syncthreads();

if (iBlockSize>=128 && tid < 64) idata[tid] += idata[tid + 64];
__syncthreads();

// unrolling warp
if (tid < 32) {
volatile int *vsmem = idata;
vsmem[tid] += vsmem[tid + 32];
vsmem[tid] += vsmem[tid + 16];
vsmem[tid] += vsmem[tid + 8];
vsmem[tid] += vsmem[tid + 4];
vsmem[tid] += vsmem[tid + 2];
vsmem[tid] += vsmem[tid + 1];
}

// write result for this block to global mem
if (tid == 0) g_odata[blockIdx.x] = idata[0];
}

对此if的规则,如若值为false,那么在编写翻译时就会去掉该语句,那样功用更加好。例如,借使调用kernel时的blocksize是25六,那么,下面包车型地铁语句将永生永世为false,编写翻译器会将他移除不予执行:

IBlockSize>=1024 && tid < 512

以此kernel必须以三个switch-case来调用:

switch (blocksize) {
    case 1024:
        reduceCompleteUnroll<1024><<<grid.x/8, block>>>(d_idata, d_odata, size);
        break;
    case 512:
        reduceCompleteUnroll<512><<<grid.x/8, block>>>(d_idata, d_odata, size);
        break;
    case 256:
        reduceCompleteUnroll<256><<<grid.x/8, block>>>(d_idata, d_odata, size);
        break;
    case 128:
        reduceCompleteUnroll<128><<<grid.x/8, block>>>(d_idata, d_odata, size);
        break;
    case 64:
        reduceCompleteUnroll<64><<<grid.x/8, block>>>(d_idata, d_odata, size);
        break;
}            

各个状态下,执行后的结果为:

 起名 9

$nvprof –metrics gld_efficiency,gst_efficiency ./reduceInteger

起名 10

1、spin lock的特点

咱俩得以总括spin lock的特点如下:

(一)spin
lock是一种死等的锁机制。当暴发访问能源争辩的时候,能够有七个挑选:一个是死等,2个是挂起近日历程,调度别的进度执行。spin
lock是一种死等的体制,当前的履行thread会不断的再次尝试直到获取锁进入临界区。

(二)只允许二个thread进入。semaphore能够允许三个thread进入,spin
lock不行,一次只可以有贰个thread获取锁并进入临界区,别的的thread都以在门口不断的品味。

(三)执行时间短。由于spin
lock死等那种特点,因而它应用在那个代码不是非凡复杂的临界区(当然也不能太不难,不然使用原子操作照旧此外适用简易场景的一路机制就OK了),如若临界区履行时间太长,那么不断在临界区门口“死等”的那一个thread是多么的浪费CPU啊(当然,现代CPU的设计都会思虑共同原语的实现,例如A福特ExplorerM提供了WFE和SEV那样的好像指令,幸免CPU进入busy
loop的无助境地)

(4)可以在刹车上下文执行。由于不睡觉,由此spin
lock能够在暂停上下文中适用。

2、 场景分析

对于spin lock,其有限帮衬的财富可财富于八个CPU
CORE上的进度上下文和间断上下文的中的访问,个中,进度上下文包罗:用户进度经过系统调用访问,内核线程直接待上访问,来自workqueue中work
function的走访(本质上也是内核线程)。中断上下文包涵:HW interrupt
context(中断handler)、软中断上下文(soft
irq,当然是因为各样缘由,该softirq被推移到softirqd的基石线程中推行的时候就不属于那几个情景了,属于进度上下文那些分类了)、timer的callback函数(本质上也是softirq)、tasklet(本质上也是softirq)。

先看最简便易行的单CPU上的经过上下文的拜会。如若3个大局的财富被五个进度上下文访问,那时候,内核如何交错执行呢?对于那么些并未有打开preemptive选项的水源,全部的体系调用都以串行化执行的,因而不设有财富掠夺的标题。假诺基本线程也拜会这一个大局财富呢?本质上基本线程也是过程,类似普通进度,只然则普通进度时而在用户态运转、时而通过系统调用陷入内核执行,而根本线程永远都以在内核态运营,不过,结果是千篇一律的,对于non-preemptive的linux
kernel,只要在内核态,就不会生出进程调度,因而,那种景色下,共享数据根本不必要爱慕(未有出现,谈何敬重吗)。如若时光停留在这边该多么好,单纯而美好,在继承发展在此之前,让我们先享受这一刻。

当打开premptive选项后,事情变得复杂了,大家着想下边包车型地铁情景:

(一)进度A在有些系统调用进程中做客了共享能源路虎极光

(二)进度B在有些系统调用进度中也访问了共享财富Sportage

会不会促成冲突呢?若是在A访问共享财富哈弗的进度中生出了刹车,中断唤醒了沉睡中的,优先级越来越高的B,在暂停重临现场的时候,产生经过切换,B启动推行,并透过系统调用访问了LX570,假使未有锁珍爱,则会冒出四个thread进入临界区,导致程序执行不正确。OK,我们添加spin
lock看看哪些:A在进入临界区以前得到了spin
lock,同样的,在A访问共享财富大切诺基的长河中生出了中断,中断唤醒了沉睡中的,优先级越来越高的B,B在拜访临界区前边仍然会试图拿走spin
lock,那时候由于A进程具有spin
lock而致使B进度进入了永恒的spin……怎么破?linux的kernel相当的粗略,在A进度取得spin
lock的时候,禁止本CPU上的侵占(上边的万古spin的场所仅仅在本CPU的进度抢占本CPU的脚下经过那样的光景中生出)。借使A和B运转在分歧的CPU上,那么意况会简单一些:A进度就算全数spin
lock而招致B进度进入spin状态,不过出于运营在分化的CPU上,A进度会没完没了举行并会相当的慢释放spin
lock,解除B进程的spin状态。

多CPU
core的场景和单核CPU打开preemptive选项的职能是相同的,那里不再赘述。

咱俩继续前行分析,将来要加盟中断上下文那一个成分。访问共享能源的thread包涵:

(一)运营在CPU0上的历程A在某些系统调用进度中访问了共享财富Odyssey

(2)运维在CPU一上的历程B在某些系统调用进程中也走访了共享能源昂Cora

(三)外设P的中止handler中也会访问共享能源帕杰罗

在那样的景观下,使用spin
lock能够保险访问共享能源LX570的临界区吗?大家假诺CPU0上的长河A持有spin
lock进入临界区,这时候,外设P产生了中断事件,并且调度到了CPU1上实施,看起来未有何难点,执行在CPU一上的handler会稍微等待1会CPU0上的进度A,等它立即临界区就会释放spin
lock的,可是,即使外设P的暂停事件被调度到了CPU0上实施会怎么着?CPU0上的过程A在颇具spin
lock的状态下被中断上下文抢占,而抢占它的CPU0上的handler在进入临界区后边依然会总结拿走spin
lock,正剧发生了,CPU0上的P外设的暂停handler永远的进去spin状态,那时候,CPU一上的进度B也不可防止在总计持有spin
lock的时候失败而招致进入spin状态。为了化解那样的题材,linux
kernel采纳了那般的办法:假使提到到中断上下文的走访,spin
lock须求和禁止本CPU上的中断联合利用。

linux kernel中提供了增进的bottom
half的编写制定,就算同属中断上下文,然而照旧稍有两样。我们得以把上边的气象不难修改一下:外设P不是刹车handler中访问共享财富宝马X3,而是在的bottom
half中走访。使用spin
lock+禁止本地中断当然是能够高达维护共享能源的机能,可是选择牛刀来杀鸡就像有点少见多怪,那时候disable
bottom half就OK了。

末段,我们探究一下中断上下文之间的竞争。同1种中断handler之间在uni
core和multi core上都不会并行执行,那是linux
kernel的性子。假使差别中断handler须要运用spin
lock爱抚共享能源,对于新的根本(不区分fast handler和slow
handler),全体handler都以关门中断的,因而使用spin
lock不须求关闭中断的合营。bottom
half又分为softirq和tasklet,同1种softirq会在分化的CPU上冒出执行,由此壹旦某些驱动中的sofirq的handler中会访问有个别全局变量,对该全局变量是急需采纳spin
lock珍视的,不用协作disable CPU中断或然bottom
half。tasklet更简约,因为同样种tasklet不会五个CPU下边世,具体作者就不分析了,我们自行思索吧。

 

3、通用代码完成

壹、文件整理

和系统布局非亲非故的代码如下:

(1)include/linux/spinlock_types.h。这些头文件定义了通用spin
lock的核心的数据结构(例如spinlock_t)和什么初叶化的接口(DEFINE_SPINLOCK)。那里的“通用”是指甭管SMP还是UP都通用的那个定义。

(2)include/linux/spinlock_types_up.h。这些头文件不应当直接include,在include/linux/spinlock_types.h文件会基于系统的布署(是还是不是SMP)include相关的头文件,要是UP则会include该头文件。那么些头文定义UP系统卯月spin
lock的着力的数据结构和怎么开首化的接口。当然,对于non-debug版本而言,大多数struct都以empty的。

(叁)include/linux/spinlock.h。那么些头文件定义了通用spin
lock的接口函数注解,例如spin_lock、spin_unlock等,使用spin
lock模块接口API的驱动模块也许其余内核模块都亟需include那些头文件。

(4)include/linux/spinlock_up.h。这几个头文件不应当一贯include,在include/linux/spinlock.h文件会依据系统的配备(是不是SMP)include相关的头文件。那几个头文件是debug版本的spin
lock须要的。

(5)include/linux/spinlock_api_up.h。同上,只但是这一个头文件是non-debug版本的spin
lock要求的

(6)linux/spinlock_api_smp.h。SMP上的spin lock模块的接口注明

(7)kernel/locking/spinlock.c。SMP上的spin lock实现。

头文件某些混乱,大家对UP和SMP上spin lock头文件进行整治:

UP需要的头文件 SMP需要的头文件

linux/spinlock_type_up.h:
linux/spinlock_types.h:
linux/spinlock_up.h:
linux/spinlock_api_up.h:
linux/spinlock.h

asm/spinlock_types.h
linux/spinlock_types.h:
asm/spinlock.h
linux/spinlock_api_smp.h:
linux/spinlock.h

2、数据结构

遵照第2章的辨析,大家得以着力能够想见出spin
lock的贯彻。首先定义3个spinlock_t的数据类型,其本质上是四个整数值(对该数值的操作须求确定保障原子性),该数值表示spin
lock是或不是可用。初始化的时候被设定为一。当thread想要持有锁的时候调用spin_lock函数,该函数将spin
lock那四个整数值减去一,然后进行判断,如若等于0,表示能够博得spin
lock,若是是负数,则注明别的thread的享有该锁,本thread必要spin。

基础中的spinlock_t的数据类型定义如下:

typedef struct spinlock {
        struct raw_spinlock rlock; 
} spinlock_t;

typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
} raw_spinlock_t;

出于各样缘由(各类锁的debug、锁的validate机制,多平台扶助什么的),spinlock_t的概念尚无那么直观,为了让工作大约1些,大家去掉那多少个繁琐的积极分子。struct
spinlock中定义了一个struct
raw_spinlock的成员,为啥会这么吗?好吧,大家又需求回到kernel历史教材中去了。在旧的根本中(比如本人精晓的linux
二.陆.23基本),spin lock的授命规则是如此:

通用(适用于各类arch)的spin lock使用spinlock_t那样的type
name,各个arch定义本人的struct
raw_spinlock。听起来不错的主意和命有名的模特式,直到linux realtime
tree(PREEMPT_XC90T)提议对spinlock的挑衅。real time
linux是二个准备将linux kernel增添健康时品质的一个支行(你精晓的,linux
kernel mainline只是永葆soft realtime),多年来,很多起点realtime
branch的性状被merge到了mainline上,例如:高精度timer、中断线程化等等。realtime
tree希望得以对现存的spinlock举办归类:壹种是在realtime
kernel中得以安息的spinlock,此外1种正是在其它意况下都无法睡觉的spinlock。分类很明亮可是怎么样起名字?起名字相对是个技术活,起得好了经济,可保证性好,什么文书档案啊、注释啊都素那浮云,阅读代码正是享受,心旷神怡。起得倒霉,注定被后人唾弃,只怕拖出来吊打(那让自己回忆给小编外甥起名字的那段悲壮的岁月……)。最后,spin
lock的命名规范定义如下:

(1)spinlock,在rt
linux(配置了PREEMPT_PRADOT)的时候可能会被并吞(实际底层可能是运用协理PI(优先级翻转)的mutext)。

(2)raw_spinlock,即就是布署了PREEMPT_OdysseyT也要坚强的spin

(3)arch_spinlock,spin
lock是和architecture相关的,arch_spinlock是architecture相关的贯彻

对于UP平台,所有的arch_spinlock_t都以1律的,定义如下:

typedef struct { } arch_spinlock_t;

怎么着都未曾,1切都以空啊。当然,那也契合前边的辨析,对于UP,即正是开拓的preempt选项,所谓的spin
lock也然而就是disable preempt而已,不需定义什么spin lock的变量。

对于SMP平台,那和arch相关,我们在下一节讲述。

3、spin lock接口API

我们整理spin lock相关的接口API如下:

接口API的类型 spinlock中的定义 raw_spinlock的定义
定义spin lock并初始化 DEFINE_SPINLOCK DEFINE_RAW_SPINLOCK
动态初始化spin lock spin_lock_init raw_spin_lock_init
获取指定的spin lock spin_lock raw_spin_lock
获取指定的spin lock同时disable本CPU中断 spin_lock_irq raw_spin_lock_irq
保存本CPU当前的irq状态,disable本CPU中断并获取指定的spin lock spin_lock_irqsave raw_spin_lock_irqsave
获取指定的spin lock同时disable本CPU的bottom half spin_lock_bh raw_spin_lock_bh
释放指定的spin lock spin_unlock raw_spin_unlock
释放指定的spin lock同时enable本CPU中断 spin_unlock_irq raw_spin_unock_irq
释放指定的spin lock同时恢复本CPU的中断状态 spin_unlock_irqstore raw_spin_unlock_irqstore
获取指定的spin lock同时enable本CPU的bottom half spin_unlock_bh raw_spin_unlock_bh
尝试去获取spin lock,如果失败,不会spin,而是返回非零值 spin_trylock raw_spin_trylock
判断spin lock是否是locked,如果其他的thread已经获取了该lock,那么返回非零值,否则返回0 spin_is_locked raw_spin_is_locked
     

在具体的兑现面,我们不容许把每二个接口函数的代码都表现出来,大家选拔最基础的spin_lock为例子,其余的读者能够团结阅读代码来明白。

spin_lock的代码如下:

static inline void spin_lock(spinlock_t *lock)
{
    raw_spin_lock(&lock->rlock);
}

当然,在linux
mainline代码中,spin_lock和raw_spin_lock是同等的,在realtime linux
patch中,spin_lock应该被换来能够sleep的版本,当然具体哪些完成自己并未有去看(大概一直采取了Mutex,究竟它提供了优先级继承性格来消除了优先级翻转的题材),有趣味的读者能够活动阅读,我们那里根本看看(本文也重点focus这些核心)真正的,不睡觉的spin
lock,相当于是raw_spin_lock,代码如下:

#define raw_spin_lock(lock)    _raw_spin_lock(lock)

UP中的落成:

#define _raw_spin_lock(lock)            __LOCK(lock)

#define __LOCK(lock) \
  do { preempt_disable(); ___LOCK(lock); } while (0)

SMP的实现:

void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
    __raw_spin_lock(lock);
}

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock,
do_raw_spin_lock);
}

UP中很简短,本质上就是二个preempt_disable而已,和我们在其次章中剖析的等同。SMP中稍显复杂,preempt_disable当然也是必须的,spin_acquire能够略过,那是和周转时检查锁的有用有关的,假诺未有定义CONFIG_LOCKDEP其实正是空函数。假设未有定义CONFIG_LOCK_STAT(和锁的总结音信有关),LOCK_CONTENDED正是调用do_raw_spin_lock而已,假诺没有定义CONFIG_DEBUG_SPINLOCK,它的代码如下:

static inline void do_raw_spin_lock(raw_spinlock_t *lock)
__acquires(lock)
{
    __acquire(lock);
    arch_spin_lock(&lock->raw_lock);
}

__acquire和静态代码检查不毫不相关系,忽略之,最后实际的获得spin
lock依旧要靠arch相关的代码完成。

 

4、A福特ExplorerM平台的底细

代码位于arch/arm/include/asm/spinlock.h和spinlock_type.h,和通用代码类似,spinlock_type.h定义A帕杰罗M相关的spin
lock定义以及初阶化相关的宏;spinlock.h中总结了种种现实的兑现。

1、纪念过去

在条分缕析新的spin
lock代码以前,让大家先回到贰.陆.二三版本的基石中,看看ARubiconM平台怎么样兑现spin
lock的。和arm平台相关spin
lock数据结构的定义如下(那时候依然选拔raw_spinlock_t而不是arch_spinlock_t):

typedef struct {
    volatile unsigned int lock;
} raw_spinlock_t;

多少个整数就OK了,0表示unlocked,1表示locked。配套的API包括__raw_spin_lock和__raw_spin_unlock。__raw_spin_lock会持续判断lock的值是还是不是等于0,倘使不等于0(locked)那么其余thread已经持有该锁,本thread就不止的spin,判断lock的数值,一贯等到该值等于0甘休,1旦探测到lock等于0,那么就设定该值为一,表示本thread持有该锁了,当然,这几个操作要确定保障原子性,细节和exclusive版本的ldr和str(即ldrex和strexeq)相关,那里略过。立即临界区后,持锁thread会调用__raw_spin_unlock函数是否spin
lock,其实正是把0那些数值赋给lock。

本条本子的spin
lock的落到实处自然能够完毕效益,而且在尚未争执的时候表现出科学的属性,但是存在一个题材:有失公正。也等于具备的thread都以在冬季的抢夺spin
lock,什么人先抢到何人先得,不管thread等了很久照旧刚刚起初spin。在顶牛相比较少的情状下,有失公正不会反映的专门了解,可是,随着硬件的上扬,多核处理器的数量进一步多,多核之间的争论进一步猛烈,冬天竞争的spinlock带来的performance
issue终于表露出来,依照Nick Piggin的叙说:

On an 8 core (2 socket) Opteron, spinlock unfairness is extremely
noticable, with a userspace test having a difference of up to 2x
runtime per thread, and some threads are starved or “unfairly” granted
the lock up to 1 000 000 (!) times.

万般的有失公允,有个别非凡的thread须要饥饿的守候一千000次。本质上冬辰竞争从可能率论的角度看应该是均匀分布的,但是出于硬件特性导致这么严重的有失公正,大家来看1看硬件block:

起名 11

lock本质上是保留在main
memory中的,由于cache的留存,当然不须求每一趟都有访问main
memory。在多核架构下,各样CPU都有谈得来的L一cache,保存了lock的数量。假使CPU0获取了spin
lock,那么执行完临界区,在释放锁的时候会调用smp_mb
invalide其余忙等待的CPU的L一 cache,这样后果正是自由spin
lock的百般cpu可以更加快的访问L1cache,操作lock数据,从而大大扩展的下贰回得到该spin
lock的空子。

二、回到将来:arch_spinlock_t

AENVISIONM平斯科学普及里的arch_spinlock_t定义如下(little endian):

typedef struct {
    union {
        u32 slock;
        struct __raw_tickets {
            u16 owner;
            u16 next;
        } tickets;
    };
} arch_spinlock_t;

自然觉得二个简约的平头类型的变量就解决的spin
lock看起来未有那么粗略,要精晓那些数据结构,要求理解部分ticket-based
spin
lock的定义。假使你有时机去九毛玖去排队用餐(注脚:不是九毛九的饭托,仅仅是爱好面食而常去吃而已)就会精晓ticket-based
spin
lock。大概是因为便宜,每一回去玖毛九一而再不能够所向披靡,门口的快意的玉女会给一个ticket,上边写着壹五号,同时会报告你,当前情景是拾号已经就位,1一号在守候。

回到arch_spinlock_t,那里的owner正是当前一度就位的不行号码,next记录的是下多个要分发的号子。下边的叙说使用普通的电脑语言和在九毛玖就餐(借使玖毛八头有一张餐桌)的事例来展开描述,估摸能够让吃货更有趣味阅读下去。最早先的时候,slock被赋值为0,也正是说owner和next都以0,owner和next相等,表示unlocked。当第3一律thread调用spin_lock来报名lock(第二私家就餐)的时候,owner和next相等,表示unlocked,那时候该thread持有该spin
lock(能够具有九毛玖的唯1的卓殊餐桌),并且实施next++,也正是将next设定为一(再来人就分配壹这么些号码让她等待就餐)。恐怕该thread执行高效(吃饭吃的快),未有其余thread来竞争就调用spin_unlock了(无人等待就餐,生意惨淡啊),那时候执行owner++,也便是将owner设定为1(表示最近享有一以此号码牌的人方可进食)。姗姗来迟的一号获得了直接开饭的空子,next++之后等于二。一号这么些东西吃饭巨慢,那是不文明现象(thread无法抱有spin
lock太久),可是存在。又来一个人用餐,分配当前next值的号码二,当然也会履行next++,以便下1人依旧叁的号码牌。持续来人就会分配叁、四、5、6这一个号码牌,next值不断的充实,可是owner一点儿也不动,直到欠扁的一号吃饭完毕(调用spin_unlock),释放饭桌那几个唯1财富,owner++之后等于2,表示拥有贰十分号码牌的人方可进去就餐了。 

三、接口完毕

同壹的,那里也只是挑选贰个非凡的API来分析,别的的我们能够自动学习。我们挑选的是arch_spin_lock,其A昂CoraM3二的代码如下:

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
    unsigned long tmp;
    u32 newval;
    arch_spinlock_t lockval;

   
prefetchw(&lock->slock);------------------------(1)
    __asm__ __volatile__(
“1:    ldrex    %0,
[%3]\n”-------------------------(2)
”    add    %1, %0, %4\n”
”    strex    %2, %1,
[%3]\n”------------------------(3)
”    teq    %2,
#0\n”----------------------------(4)
”    bne    1b”
    : “=&r” (lockval), “=&r” (newval), “=&r” (tmp)
    : “r” (&lock->slock), “I” (1 << TICKET_SHIFT)
    : “cc”);

    while (lockval.tickets.next != lockval.tickets.owner)
{------------(5)
       
wfe();-------------------------------(6)
        lockval.tickets.owner =
ACCESS_ONCE(lock->tickets.owner);------(7)
    }

   
smp_mb();------------------------------(8)
}

(1)和preloading cache相关的操作,主若是为着质量怀想

(2)将slock的值保存在lockval那一个一时半刻变量中

(3)将spin lock中的next加一

(四)判断是或不是有别的的thread插入。更实际的细节参考Linux内核同步机制之(1):原子操作中的描述

(5)判断当前spin lock的情况,假若是unlocked,那么直接获取到该锁

(陆)若是当前spin
lock的境况是locked,那么调用wfe进入等待情形。更实际的底细请参考ARM
WFI和WFE指令
中的描述。

(七)别的的CPU唤醒了本cpu的实施,表达owner发生了转移,该新的own赋给lockval,然后继续判断spin
lock的情形,也正是回去step 伍。

(八)memory barrier的操作,具体能够参见memory
barrier
中的描述。

  arch_spin_lock函数ARM64的代码(来自4.1.10内核)如下:

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
    unsigned int tmp;
    arch_spinlock_t lockval, newval;

    asm volatile(
    /* Atomically increment the next ticket. */
”    prfm    pstl1strm, %3\n”
“1:    ldaxr    %w0,
%3\n”-----(A)-----------lockval = lock
”    add    %w1, %w0, %w5\n”-------------newval =
lockval + (1 << 16),相当于next++
”    stxr    %w2, %w1, %3\n”--------------lock =
newval
”    cbnz    %w2,
1b\n”--------------是不是有此外PE的推行流插入?有的话,重来。
    /* Did we get the lock? */
”    eor    %w1, %w0, %w0, ror
#16\n”--lockval中的next域正是温馨的号码牌,判断是不是等于owner
”    cbz    %w1,
3f\n”----------------假若等于,持锁进入临界区
    /*
     * No: spin on the owner. Send a local event to avoid missing
an
     * unlock before the exclusive load.
     */
”    sevl\n”
“2:    wfe\n”--------------------不然进入spin
”    ldaxrh    %w2,
%4\n”----(A)---------其余cpu唤醒本cpu,获取当前owner值
”    eor    %w1, %w2, %w0, lsr
#16\n”---------本身的号码牌是不是等于owner?
”    cbnz    %w1,
2b\n”----------就算等于,持锁进入临界区,否者回到2,即接二连三spin
    /* We got the lock. Critical section starts here. */
“3:”
    : “=&r” (lockval), “=&r” (newval), “=&r” (tmp), “+Q” (*lock)
    : “Q” (lock->owner), “I” (1 << TICKET_SHIFT)
    : “memory”);
}

着力的代码逻辑的叙说都曾经停放代码中,这里须求特地表达的有两个知识点:

(一)Load-Acquire/Store-Release指令的使用。Load-Acquire/Store-Release指令是A奥迪Q3Mv8的特征,在推行load和store操作的时候顺便执行了memory
barrier相关的操作,在spinlock这一个情景,使用Load-Acquire/Store-Release指令代替dmb指令能够节省一条指令。上面代码中的(A)就标识了采取Load-Acquire指令的地方。Store-Release指令在何地呢?在arch_spin_unlock中,那里就不贴代码了。Load-Acquire/Store-Release指令的功能如下:

      
-Load-Acquire能够确认保证系统中具备的observer看到的都以该指令先实施,然后是该指令之后的下令(program
order)再进行

      
-Store-Release指令可以确认保障系统中存有的observer看到的都是该指令以前的吩咐(program
order)先实施,Store-Release指令随后履行

(二)第三个知识点是关于在arch_spin_unlock代码中为什么并未有SEV指令?关于那一个标题能够参考A路虎极光M
ARAV肆M文书档案中的Figure B二-伍,那么些图是PE(n)的global
monitor的气象迁移图。当PE(n)对x地址发起了exclusive操作的时候,PE(n)的global
monitor从open access迁移到exclusive
access状态,来自其余PE上针对x(该地点已经被mark for
PE(n))的store操作会招致PE(n)的global monitor从exclusive
access迁移到open access状态,那时候,PE(n)的伊芙nt
register会被写入event,就好象生成1个event,将该PE唤醒,从而得以简不难单2个SEV的授命。

注: 

(1)+表示在放置的汇编指令中,该操作数会被命令读取(也正是说是输入参数)也会被汇编指令写入(也正是说是出口参数)。
(2)=表示在停放的汇编指令中,该操作数会是write
only的,约等于说只做输出参数。
(3)I表示操作数是及时数

 

原创小说,转载请评释出处。蜗窝科学和技术

 

Change log:

一、二〇一六/11/五,到场A猎豹CS陆M64的代码完成部分的辨析

二、二〇一四/11/一七,扩大AWranglerM6四代码中的五个知识点的叙述

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图