spinlock是什么
Linux kernel中的spinlock是一种用于同步访问共享资源的机制。它是一种轻量级自旋锁,用于在多核系统中实现互斥。自旋锁不会引起调用者睡眠,而是通过在循环中不断地检查锁状态来等待锁释放。
当一个线程或进程需要访问被spinlock保护的共享资源时,它会尝试获取该自旋锁。如果自旋锁当前没有被任何其他线程持有,则该线程可以立即获得锁并继续执行。如果自旋锁已经被其他线程持有,那么请求锁的线程将进入一个忙等待的循环,不断检查锁状态,直到锁被释放为止。
使用spinlock的好处是它避免了线程切换的开销,因为自旋锁不会导致调用者阻塞或睡眠。这对于需要快速互斥访问的临界区域非常有用,特别是在短时间内锁被释放的情况下。
然而,自旋锁也存在一些缺点。如果一个线程长时间持有自旋锁而不释放,那些等待获取锁的线程将会长时间忙等待,浪费CPU资源。因此,自旋锁适用于临界区域的保护时间较短的情况,而长时间持有锁的情况可能会导致性能下降。
总之,spinlock是Linux kernel中一种用于同步访问共享资源的机制,通过自旋等待来确保互斥性,避免了线程切换的开销。
Linux 中的spinlock有三种实现方式
CAS
基本思想
在spinlock的实现中,常用的一种机制是使用CAS(Compare-and-Swap)指令来确保原子操作。CAS指令允许我们比较某个内存位置的值与期望值,并且只有在匹配时才会更新该内存位置的值。
一般步骤 :
- 定义一个整数变量作为spinlock,初始值为0表示未锁定状态。
- 当一个线程希望获取spinlock时,它会通过CAS指令尝试将spinlock的值从0修改为1。如果成功修改,则表示该线程获得了spinlock,可以继续执行。如果修改失败,说明其他线程已经持有了spinlock,当前线程需要继续循环等待,直到成功获取为止。
- 当一个线程完成对共享资源的操作后,它会释放spinlock,将其值修改回0,表示解锁状态。
需要注意的是,CAS操作需要保证原子性,因此通常需要借助底层的原子指令或者使用特定的库函数来实现。在不同的体系架构和编程语言中,CAS的具体实现方式可能会有所不同,但其核心思想是相似的。
实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
while (!__sl_cas(&lock->lock, 1, 0));
}
/*
@param *p 指向spinlock变量所在的内存位置
@param old 存储的试图获取spinlock的本地cpu期望的值
*/
static inline unsigned __sl_cas(volatile unsigned *p, unsigned old, unsigned new)
{
__asm__ __volatile__("cas.l %1,%0,@r0" // CAS指令,尝试将值%0(new)与@r0(p指向的内存位置)进行比较和交换
: "+r"(new) // 输出操作数:"+r"(new)表示将new作为一个输入输出操作数,并且在操作后更新它的值
: "r"(old), "z"(p) // 输入操作数:"r"(old)表示将old作为只读操作数,"z"(p)表示将p作为只读操作数,并使用零寄存器
: "t", "memory" ); // clobber列表(影响因素):"t"表示寄存器t被修改,"memory"表示内存可能被修改
// 解释:
// 该代码段使用了嵌入汇编语言来执行CAS操作。
// cas.l是一个特定体系结构上的CAS指令(compare-and-swap)
// 它会将new的值与p指向的内存位置进行比较,如果相等则用new的值替换该内存位置的值。
// 这个CAS操作是原子的,意味着在执行期间不会被其他线程干扰。
// 通过使用"+r"约束符号,将new作为输入输出操作数,可以在操作后更新new的值。
// old和p都被认为是只读操作数,不会被修改。
// "t"和"memory"被列在clobber列表中,表示CAS操作可能会修改寄存器t的内容,并且可能会影响内存。
return new;
}
|
缺点 - 不公平
一旦spinlock被释放,第一个能够成功执行CAS操作的CPU将成为新的owner,没有办法确保在该spinlock上等待时间最长的那个CPU优先获得锁,这将带来延迟不能确定的问题。
Ticket Spinlock
基本思想
采用一种排队的形式,解决序竞争导致的不公平现象
一个形象的比喻,就像银行取号办业务,叫到那号了,再去窗口办理 (来自一位大神的博客中的比喻,非常形象)
实现
表示一个spinlock的数据结构由"head"和"tail"两个队列的索引组成。
1
2
3
4
|
typedef struct _spinlock {
uint32_t head;//指向等锁的链表头部
uint32_t tail;//指向等锁的链表尾部
} spinlock_t;
|
解锁
spinlock 被 拥有者释放时,会将head中的值加一(inc :原子操作)。类比银行办理业务,就是叫号操作。
1
2
3
4
5
6
7
8
9
10
11
|
asm volatile (" lock incl %[head]\n" // 使用lock前缀进行原子操作,将lock->head的值加1
:
: [head] "m" (lock->head) // 输入输出操作数,[head]表示对应的占位符,"m"表示使用内存约束,将lock->head作为输入输出操作数
: "cc", "memory"); // clobber列表,"cc"表示影响条件码寄存器,"memory"表示可能会修改内存
// 解释:
// 这段代码使用了嵌入汇编语言来执行原子操作。
// lock incl是一个特殊的指令序列,结合了lock前缀和inc指令,用于将指定的内存位置的值增加1,并且保证该操作是原子的。
// %[head]是占位符,表示将在后面的冒号中定义输入输出操作数的具体值。
// [head] "m" (lock->head)表示将lock->head作为输入输出操作数,并使用内存约束,确保对应的内存位置被正确访问。
// "cc"和"memory"被列在clobber列表中,表示该指令可能会影响条件码寄存器和内存。
|
这里的lock可以参考之前关于原子操作的文章
加锁
其他CPU在试图获取这个spinlock时,会通过"add"指令将该spinlock的"tail"值加1,然而将加1后的"tail"值保存在自己的eax寄存器中。类比银行叫号,就是取号操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
static inline void spinlock_obtain(spinlock_t *lock)
{
asm volatile (" movl $0x1,%%eax\n" // 将立即数1赋值给寄存器eax
" lock xaddl %%eax,%[tail]\n" // 使用lock前缀执行xadd指令,将tail的值加1,同时将结果保存到eax中
" cmpl %%eax,%[head]\n" // 比较eax和head的值
" jz 1f\n" // 如果相等,跳转到标号1处,表示获得了锁
"2: pause\n" // 如果不相等,执行暂停指令pause,继续比较
" cmpl %%eax,%[head]\n" // 再次比较eax和head的值
" jnz 2b\n" // 如果不相等,跳转到标号2处,继续进行比较
"1:\n" // 标号1,表示成功获得锁
:
:
[head] "m"(lock->head), // 输入输出操作数,[head]表示对应的占位符,"m"表示使用内存约束,将lock->head作为输入输出操作数
[tail] "m"(lock->tail) // 同上,将lock->tail作为输入输出操作数
: "cc", "memory", "eax"); // clobber列表,"cc"表示影响条件码寄存器,"memory"表示可能会修改内存,"eax"表示寄存器eax可能被修改
}
// 解释:
// 这段代码使用了嵌入汇编语言来实现自旋锁的获取操作。
// 自旋锁是一种同步机制,在多线程环境中用于控制对共享资源的访问。
// 该代码通过嵌入汇编的方式实现了自旋锁的获取过程。
// movl指令将立即数1赋值给寄存器eax;
// lock xaddl指令使用lock前缀执行xadd指令,将tail的值加1,并将结果保存到eax中;
// cmpl指令比较eax和head的值;
// jz指令根据比较结果进行跳转,如果相等则跳转到标号1处,表示获得了锁,否则继续执行;
// pause指令是一个空操作,用于在自旋时减少功耗;
// 最后两条cmpl指令和jnz指令组成一个循环,用于不断地进行比较,直到成功获得锁为止。
// 输入输出操作数使用了内存约束,并将lock->head和lock->tail作为输入输出操作数。
// clobber列表中声明了影响的因素,包括条件码寄存器、内存和寄存器eax。
|
接下来就是不断的循环比较,判断该spinlock当前的"head"值,是否和自己存储在eax寄存器中的"tail"值相等,相等时则获得该spinlock,成为新的owner。
优缺点
优点:先到先得,有序竞争,避免不公平竞争
缺点:根据硬件维护的cache一致性协议,如果spinlock的值没有更改,那么在busy wait时,试图获取spinlock的CPU,只需要不断地读取自己包含这个spinlock变量的cache line上的值就可以了,不需要从spinlock变量所在的内存位置读取。
但是,当spinlock的值被更改时,所有试图获取spinlock的CPU对应的cache line都会被invalidate,因为这些CPU会不停地读取这个spinlock的值,所以"invalidate"状态意味着此时,它们必须重新从内存读取新的spinlock的值到自己的cache line中。
而事实上,其中只会有一个CPU,也就是队列中最先达到的那个CPU,接下来可以获得spinlock,也只有它的cache line被invalidate才是有意义的,对于其他的CPU来说,这就是做无用功。内存比cache慢那么多,开销可不小。
MCS
基本思想
在ticket spinlock 的基础上,想要实现不让所有等待的任务无效等待解锁,MCS Lock 将mcs_spinlock设计为per-CPU的变量,那么只需要查询自己对应的这个变量所在的cache line ,仅在这个变量发生变化的时候,才需要读取内存和刷新这条cache line。
1
2
3
4
|
struct mcs_spinlock{
struct mcs_spinlock *next;
int locked ;
};
|
实现
加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node){
// 初始化node
node->locked = 0;
node->next = NULL;
// 找队列末尾的那个mcs lock
struct mcs_spinlock *prev = xchg(lock, node);//*prev = lock , *lock = node ;
// 队列为空,立即获得锁
if (likely(prev == NULL)) {
return; //没有将locked赋值为1是因为此时队列为空,无论值是什么都无所谓
}
// 队列不为空,把自己加到队列的末尾
WRITE_ONCE(prev->next, node);
// 等待lock的持有者把lock传给自己
arch_mcs_spin_lock_contended(&node->locked);
}
#define arch_mcs_spin_lock_contended(l)
do {
smp_cond_load_acquire(l, VAL); //一直等待
} while (0)
|
解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
// 找到等待队列中的下一个节点
struct mcs_spinlock *next = READ_ONCE(node->next);
// 当前没有其他CPU试图获得锁
if (likely(!next)) {
// 直接释放锁
if (likely(cmpxchg_release(lock, node, NULL) == node))
return;
// 等待新的node添加成功while (!(next = READ_ONCE(node->next)))
cpu_relax();
}
// 将锁传给等待队列中的下一个node
arch_mcs_spin_unlock_contended(&next->locked);
}
|
优缺点
优点:每一次释放锁时,只有等待队列的下一个node去获取锁,在实现公平的基础上,减少了对内存的无效访问
缺点:占用空间,因为相比起Linux中只占4个字节的ticket spinlock,MCS lock多了一个指针,要多占4(或者8)个字节,消耗的存储空间是原来的2-3倍。
qspinlock
基本思想
实现
等待队列由一个qspinlock和若干mcs lock 节点组成,mcs lock 的实现方法不变
qspinlock的结构如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
typedef struct qspinlock {
union {
atomic_t val;
#ifdef __LITTLE_ENDIAN
struct {
u8 locked;
u8 pending;
};
struct {
u16 locked_pending;
u16 tail;
};
#else
...
}
}
|
以little endian为例,整个qspinlock结构体如下:
locked byte
(1字节) :替代 MCS Lock 结构体中 locker
(int 类型,占4字节)
tail cpu
(14bit):qspinlock使用的是一个per-CPU的数组来表示每个MCS node(用qnode结构体表示),通过CPU编号作为数组的index,就可以获得对应 MCS node的内存位置,因而qspinlock使用的是末尾 MCS node的CPU编号加1,即"tail cpu",来记录等待队列tail的位置
tail index
(2bit): Linux中一共有4种context,分别是task, softirq, hardirq和nmi,而一个CPU在一种context下,至多持有一个spinlock,因而一个CPU至多同时持有4个spinlock,“tail index"就是用来标识context的编号的
pending
(1bit):第一顺位标志,当前锁的拥有者释放锁之后,该位被置位的任务第一个获得锁
加锁
1
2
3
4
5
6
7
8
|
#define _Q_LOCKED_OFFSET 0#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET)
void queued_spin_lock(struct qspinlock *lock){
//和当前lock的"val"值进行比较,看是否为0,如果为0,那么获得spinlock,同时将lock的值设为1
u32 val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
if (likely(val == 0))
return;
queued_spin_lock_slowpath(lock, val);//没有成功获取锁,进入这个流程
}
|
进入slowpath流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#define _Q_LOCKED_OFFSET 0
#define _Q_LOCKED_BITS 8
#define _Q_LOCKED_MASK ((1U << _Q_LOCKED_BITS) - 1) << _Q_LOCKED__OFFSET // (*,*,1)// "val"是在queued_spin_lock中获取的当前lock的值
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
// ......
//尝试获取pengding位
val = queued_fetch_set_pending_acquire(lock);
// 等待第一个CPU移交
if (val & _Q_LOCKED_MASK)
atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
clear_pending_set_locked(lock);
return;
// ......
}
|
如果第二个任务还未执行,但是第三个任务已经来了 ,有两种情况,①locked标志位已经为0,但是pending标志为1,此时为锁拥有者已经释放锁,但是第一顺位者还没有成功获取锁的一个中间态,②pending位为1,locked也是1,大部分情况是这样的,此时旧进入queue流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#define _Q_PENDING_OFFSET 8
#define _Q_PENDING_VAL (1U << _Q_PENDING_OFFSET) // (0,1,0)
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
//移交的过渡状态
if (val == _Q_PENDING_VAL) {
// 等待移交完成
int cnt = 1
val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL)|| !cnt--);
}
// 已经有2个及以上的CPU持有或试图获取spinlock
if (val & ~_Q_LOCKED_MASK)
goto queue; // 进入MCS node队列
//....
struct mcs_spinlock *node = this_cpu_ptr(&qnodes[0].mcs);
int idx = node->count++;node += idx;
node->locked = 0;
node->next = NULL;
u32 tail = encode_tail(smp_processor_id(), idx);//更新自旋锁的尾部指针
u32 old = xchg_tail(lock, tail);
//....
}
|
解锁
1
2
3
4
|
void queued_spin_unlock(struct qspinlock *lock)
{
smp_store_release(&lock->locked , 0 );
}
|
只要将qspinlock中的“locked byte”设置为0就可以了
优点
如果只有1个或2个CPU试图获取锁,那么只需要一个4字节的qspinlock就可以了,其所占内存的大小和ticket spinlock一样。当第二个CPU试图获取锁的时候,直接将pending位置1即可,不需要创建MCS node 。 这样的设计已经足够应对很多情况了。
当有3个以上的CPU试图获取锁,需要一个qspinlock加上(N-2)个MCS node。
试图加锁的CPU数目超过3个是小概率事件,但一旦发生,使用ticket spinlock机制就会造成多个CPU的cache line无谓刷新的问题,而qspinlock可以利用MCS node队列来解决这个问题。
所以在Linux中,如果某个处理器架构没有明确定义,qspinlock将是默认的spinlock的实现方式。
[1] Linux 中的 spinlock 机制 [一] - CAS 和 ticket spinlock
[2] Linux 中的 spinlock 机制 [二] - MCS Lock
[3] Linux 中的 spinlock 机制 [三] - qspinlock
[4] Linux 中的 spinlock 机制 [四] - API 的使用
[5] Linux 中的 spinlock 机制 [五] - 死锁问题
[6] AWei’s Kernel Tour | 【同步机制】原子操作 (cnjslw.github.io)