主页
文章
分类
系列
标签
简历
【同步机制】SpinLock
发布于: 2022-3-19   更新于: 2022-3-19   收录于: Linux Kernel
文章字数: 860   阅读时间: 5 分钟   阅读量:

spinlock是什么

Linux kernel中的spinlock是一种用于同步访问共享资源的机制。它是一种轻量级自旋锁,用于在多核系统中实现互斥。自旋锁不会引起调用者睡眠,而是通过在循环中不断地检查锁状态来等待锁释放。

当一个线程或进程需要访问被spinlock保护的共享资源时,它会尝试获取该自旋锁。如果自旋锁当前没有被任何其他线程持有,则该线程可以立即获得锁并继续执行。如果自旋锁已经被其他线程持有,那么请求锁的线程将进入一个忙等待的循环,不断检查锁状态,直到锁被释放为止。

使用spinlock的好处是它避免了线程切换的开销,因为自旋锁不会导致调用者阻塞或睡眠。这对于需要快速互斥访问的临界区域非常有用,特别是在短时间内锁被释放的情况下。

然而,自旋锁也存在一些缺点。如果一个线程长时间持有自旋锁而不释放,那些等待获取锁的线程将会长时间忙等待,浪费CPU资源。因此,自旋锁适用于临界区域的保护时间较短的情况,而长时间持有锁的情况可能会导致性能下降。

总之,spinlock是Linux kernel中一种用于同步访问共享资源的机制,通过自旋等待来确保互斥性,避免了线程切换的开销。

Linux 中的spinlock有三种实现方式

CAS

基本思想

在spinlock的实现中,常用的一种机制是使用CAS(Compare-and-Swap)指令来确保原子操作。CAS指令允许我们比较某个内存位置的值与期望值,并且只有在匹配时才会更新该内存位置的值。 一般步骤 :

  1. 定义一个整数变量作为spinlock,初始值为0表示未锁定状态。
  2. 当一个线程希望获取spinlock时,它会通过CAS指令尝试将spinlock的值从0修改为1。如果成功修改,则表示该线程获得了spinlock,可以继续执行。如果修改失败,说明其他线程已经持有了spinlock,当前线程需要继续循环等待,直到成功获取为止。
  3. 当一个线程完成对共享资源的操作后,它会释放spinlock,将其值修改回0,表示解锁状态。

需要注意的是,CAS操作需要保证原子性,因此通常需要借助底层的原子指令或者使用特定的库函数来实现。在不同的体系架构和编程语言中,CAS的具体实现方式可能会有所不同,但其核心思想是相似的。

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
	while (!__sl_cas(&lock->lock, 1, 0));
}
/*
@param *p 指向spinlock变量所在的内存位置
@param old 存储的试图获取spinlock的本地cpu期望的值
*/
static inline unsigned __sl_cas(volatile unsigned *p, unsigned old, unsigned new)
{
	__asm__ __volatile__("cas.l %1,%0,@r0"   // CAS指令,尝试将值%0(new)与@r0(p指向的内存位置)进行比较和交换
	        : "+r"(new)                   // 输出操作数:"+r"(new)表示将new作为一个输入输出操作数,并且在操作后更新它的值
	        : "r"(old), "z"(p)            // 输入操作数:"r"(old)表示将old作为只读操作数,"z"(p)表示将p作为只读操作数,并使用零寄存器
	        : "t", "memory" );            // clobber列表(影响因素):"t"表示寄存器t被修改,"memory"表示内存可能被修改
	
	// 解释:
	// 该代码段使用了嵌入汇编语言来执行CAS操作。
	// cas.l是一个特定体系结构上的CAS指令(compare-and-swap)
	// 它会将new的值与p指向的内存位置进行比较,如果相等则用new的值替换该内存位置的值。
	// 这个CAS操作是原子的,意味着在执行期间不会被其他线程干扰。
	// 通过使用"+r"约束符号,将new作为输入输出操作数,可以在操作后更新new的值。
	// old和p都被认为是只读操作数,不会被修改。
	// "t"和"memory"被列在clobber列表中,表示CAS操作可能会修改寄存器t的内容,并且可能会影响内存。

	return new;
}

缺点 - 不公平

一旦spinlock被释放,第一个能够成功执行CAS操作的CPU将成为新的owner,没有办法确保在该spinlock上等待时间最长的那个CPU优先获得锁,这将带来延迟不能确定的问题。

Ticket Spinlock

基本思想

采用一种排队的形式,解决序竞争导致的不公平现象

一个形象的比喻,就像银行取号办业务,叫到那号了,再去窗口办理 (来自一位大神的博客中的比喻,非常形象)

实现

表示一个spinlock的数据结构由"head"和"tail"两个队列的索引组成。

1
2
3
4
typedef struct _spinlock {
	uint32_t head;//指向等锁的链表头部
	uint32_t tail;//指向等锁的链表尾部
} spinlock_t;

解锁

spinlock 被 拥有者释放时,会将head中的值加一(inc :原子操作)。类比银行办理业务,就是叫号操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
asm volatile ("   lock incl %[head]\n"   // 使用lock前缀进行原子操作,将lock->head的值加1
      :
      : [head] "m" (lock->head)       // 输入输出操作数,[head]表示对应的占位符,"m"表示使用内存约束,将lock->head作为输入输出操作数
      : "cc", "memory");              // clobber列表,"cc"表示影响条件码寄存器,"memory"表示可能会修改内存

// 解释:
// 这段代码使用了嵌入汇编语言来执行原子操作。
// lock incl是一个特殊的指令序列,结合了lock前缀和inc指令,用于将指定的内存位置的值增加1,并且保证该操作是原子的。
// %[head]是占位符,表示将在后面的冒号中定义输入输出操作数的具体值。
// [head] "m" (lock->head)表示将lock->head作为输入输出操作数,并使用内存约束,确保对应的内存位置被正确访问。
// "cc"和"memory"被列在clobber列表中,表示该指令可能会影响条件码寄存器和内存。

这里的lock可以参考之前关于原子操作的文章

加锁

其他CPU在试图获取这个spinlock时,会通过"add"指令将该spinlock的"tail"值加1,然而将加1后的"tail"值保存在自己的eax寄存器中。类比银行叫号,就是取号操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static inline void spinlock_obtain(spinlock_t *lock)
{   
    asm volatile ("   movl $0x1,%%eax\n"  // 将立即数1赋值给寄存器eax
                  "   lock xaddl %%eax,%[tail]\n"  // 使用lock前缀执行xadd指令,将tail的值加1,同时将结果保存到eax中
                  "   cmpl %%eax,%[head]\n"  // 比较eax和head的值
                  "   jz 1f\n"  // 如果相等,跳转到标号1处,表示获得了锁
                  "2: pause\n"  // 如果不相等,执行暂停指令pause,继续比较
                  "   cmpl %%eax,%[head]\n"  // 再次比较eax和head的值
                  "   jnz 2b\n"  // 如果不相等,跳转到标号2处,继续进行比较
                  "1:\n"  // 标号1,表示成功获得锁
                  :
                  :
                  [head] "m"(lock->head),  // 输入输出操作数,[head]表示对应的占位符,"m"表示使用内存约束,将lock->head作为输入输出操作数
                  [tail] "m"(lock->tail)  // 同上,将lock->tail作为输入输出操作数
                  : "cc", "memory", "eax");  // clobber列表,"cc"表示影响条件码寄存器,"memory"表示可能会修改内存,"eax"表示寄存器eax可能被修改
}

// 解释:
// 这段代码使用了嵌入汇编语言来实现自旋锁的获取操作。
// 自旋锁是一种同步机制,在多线程环境中用于控制对共享资源的访问。
// 该代码通过嵌入汇编的方式实现了自旋锁的获取过程。
// movl指令将立即数1赋值给寄存器eax;
// lock xaddl指令使用lock前缀执行xadd指令,将tail的值加1,并将结果保存到eax中;
// cmpl指令比较eax和head的值;
// jz指令根据比较结果进行跳转,如果相等则跳转到标号1处,表示获得了锁,否则继续执行;
// pause指令是一个空操作,用于在自旋时减少功耗;
// 最后两条cmpl指令和jnz指令组成一个循环,用于不断地进行比较,直到成功获得锁为止。
// 输入输出操作数使用了内存约束,并将lock->head和lock->tail作为输入输出操作数。
// clobber列表中声明了影响的因素,包括条件码寄存器、内存和寄存器eax。

接下来就是不断的循环比较,判断该spinlock当前的"head"值,是否和自己存储在eax寄存器中的"tail"值相等,相等时则获得该spinlock,成为新的owner。

优缺点

优点:先到先得,有序竞争,避免不公平竞争

缺点:根据硬件维护的cache一致性协议,如果spinlock的值没有更改,那么在busy wait时,试图获取spinlock的CPU,只需要不断地读取自己包含这个spinlock变量的cache line上的值就可以了,不需要从spinlock变量所在的内存位置读取。 但是,当spinlock的值被更改时,所有试图获取spinlock的CPU对应的cache line都会被invalidate,因为这些CPU会不停地读取这个spinlock的值,所以"invalidate"状态意味着此时,它们必须重新从内存读取新的spinlock的值到自己的cache line中。 而事实上,其中只会有一个CPU,也就是队列中最先达到的那个CPU,接下来可以获得spinlock,也只有它的cache line被invalidate才是有意义的,对于其他的CPU来说,这就是做无用功。内存比cache慢那么多,开销可不小。

MCS

基本思想

在ticket spinlock 的基础上,想要实现不让所有等待的任务无效等待解锁,MCS Lock 将mcs_spinlock设计为per-CPU的变量,那么只需要查询自己对应的这个变量所在的cache line ,仅在这个变量发生变化的时候,才需要读取内存和刷新这条cache line。

1
2
3
4
struct mcs_spinlock{
	struct mcs_spinlock *next;
	int locked ; 
};

Pasted image 20231001162555|800

实现

加锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node){  
	// 初始化node  
	node->locked = 0;  
	node->next = NULL;  
	  
	// 找队列末尾的那个mcs lock  
	struct mcs_spinlock *prev = xchg(lock, node);//*prev = lock , *lock = node ;
	// 队列为空,立即获得锁  
	if (likely(prev == NULL)) {  
		return;  //没有将locked赋值为1是因为此时队列为空,无论值是什么都无所谓
	}  
	  
	// 队列不为空,把自己加到队列的末尾  
	WRITE_ONCE(prev->next, node);  
	  
	// 等待lock的持有者把lock传给自己  
	arch_mcs_spin_lock_contended(&node->locked);
}


#define arch_mcs_spin_lock_contended(l)
do {  
smp_cond_load_acquire(l, VAL);  //一直等待
} while (0)

解锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{  
	// 找到等待队列中的下一个节点  
	struct mcs_spinlock *next = READ_ONCE(node->next);  
	  
	// 当前没有其他CPU试图获得锁  
	if (likely(!next)) {  
		// 直接释放锁  
		if (likely(cmpxchg_release(lock, node, NULL) == node))  
			return;  
		// 等待新的node添加成功while (!(next = READ_ONCE(node->next)))  
		cpu_relax();  
	}  
	  
	// 将锁传给等待队列中的下一个node  
	arch_mcs_spin_unlock_contended(&next->locked);
}

优缺点

优点:每一次释放锁时,只有等待队列的下一个node去获取锁,在实现公平的基础上,减少了对内存的无效访问 缺点:占用空间,因为相比起Linux中只占4个字节的ticket spinlock,MCS lock多了一个指针,要多占4(或者8)个字节,消耗的存储空间是原来的2-3倍。

qspinlock

基本思想

实现

等待队列由一个qspinlock和若干mcs lock 节点组成,mcs lock 的实现方法不变

Pasted image 20231001192443|800

qspinlock的结构如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
typedef struct qspinlock {
    union {
        atomic_t val;
#ifdef __LITTLE_ENDIAN
        struct {
            u8	locked;
            u8	pending;
        };
        struct {
            u16	locked_pending;
            u16	tail;
        };
#else
    ...
    }
}

以little endian为例,整个qspinlock结构体如下:

Pasted image 20231001193544|800

  • locked byte(1字节) :替代 MCS Lock 结构体中 locker (int 类型,占4字节)
  • tail cpu(14bit):qspinlock使用的是一个per-CPU的数组来表示每个MCS node(用qnode结构体表示),通过CPU编号作为数组的index,就可以获得对应 MCS node的内存位置,因而qspinlock使用的是末尾 MCS node的CPU编号加1,即"tail cpu",来记录等待队列tail的位置
  • tail index (2bit): Linux中一共有4种context,分别是task, softirq, hardirq和nmi,而一个CPU在一种context下,至多持有一个spinlock,因而一个CPU至多同时持有4个spinlock,“tail index"就是用来标识context的编号的
  • pending(1bit):第一顺位标志,当前锁的拥有者释放锁之后,该位被置位的任务第一个获得锁

加锁

1
2
3
4
5
6
7
8
#define _Q_LOCKED_OFFSET	0#define _Q_LOCKED_VAL		(1U << _Q_LOCKED_OFFSET)  
void queued_spin_lock(struct qspinlock *lock){    
	//和当前lock的"val"值进行比较,看是否为0,如果为0,那么获得spinlock,同时将lock的值设为1
	u32 val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);    
	if (likely(val == 0))        
		return;    
	queued_spin_lock_slowpath(lock, val);//没有成功获取锁,进入这个流程
}

进入slowpath流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#define _Q_LOCKED_OFFSET     0
#define _Q_LOCKED_BITS	     8
#define _Q_LOCKED_MASK       ((1U << _Q_LOCKED_BITS) - 1) << _Q_LOCKED__OFFSET // (*,*,1)// "val"是在queued_spin_lock中获取的当前lock的值
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{ 
	// ......
	//尝试获取pengding位
	val = queued_fetch_set_pending_acquire(lock);  
	// 等待第一个CPU移交  
	if (val & _Q_LOCKED_MASK)  
	atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));  
	  
	clear_pending_set_locked(lock);  
	return;
	// ......
}

如果第二个任务还未执行,但是第三个任务已经来了 ,有两种情况,①locked标志位已经为0,但是pending标志为1,此时为锁拥有者已经释放锁,但是第一顺位者还没有成功获取锁的一个中间态,②pending位为1,locked也是1,大部分情况是这样的,此时旧进入queue流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#define _Q_PENDING_OFFSET    8
#define _Q_PENDING_VAL       (1U << _Q_PENDING_OFFSET) // (0,1,0)
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{ 
    //移交的过渡状态
    if (val == _Q_PENDING_VAL) {
        // 等待移交完成
        int cnt = 1
        val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL)|| !cnt--);
    }


    // 已经有2个及以上的CPU持有或试图获取spinlock
    if (val & ~_Q_LOCKED_MASK)
        goto queue;  // 进入MCS node队列

	//....
	struct mcs_spinlock *node = this_cpu_ptr(&qnodes[0].mcs);  
	int idx = node->count++;node += idx;  
	node->locked = 0;  
	node->next = NULL;
	u32 tail = encode_tail(smp_processor_id(), idx);//更新自旋锁的尾部指针
	u32 old = xchg_tail(lock, tail);
	//....

}

解锁

1
2
3
4
void queued_spin_unlock(struct qspinlock *lock)
{
	smp_store_release(&lock->locked , 0 );
}

只要将qspinlock中的“locked byte”设置为0就可以了

优点

如果只有1个或2个CPU试图获取锁,那么只需要一个4字节的qspinlock就可以了,其所占内存的大小和ticket spinlock一样。当第二个CPU试图获取锁的时候,直接将pending位置1即可,不需要创建MCS node 。 这样的设计已经足够应对很多情况了。 当有3个以上的CPU试图获取锁,需要一个qspinlock加上(N-2)个MCS node。 试图加锁的CPU数目超过3个是小概率事件,但一旦发生,使用ticket spinlock机制就会造成多个CPU的cache line无谓刷新的问题,而qspinlock可以利用MCS node队列来解决这个问题。 所以在Linux中,如果某个处理器架构没有明确定义,qspinlock将是默认的spinlock的实现方式。


[1] Linux 中的 spinlock 机制 [一] - CAS 和 ticket spinlock

[2] Linux 中的 spinlock 机制 [二] - MCS Lock

[3] Linux 中的 spinlock 机制 [三] - qspinlock

[4] Linux 中的 spinlock 机制 [四] - API 的使用

[5] Linux 中的 spinlock 机制 [五] - 死锁问题

[6] AWei’s Kernel Tour | 【同步机制】原子操作 (cnjslw.github.io)