自旋锁是一种同步机制,用于在多处理器环境中保护共享资源。当一个线程尝试获取一个已经被其他线程持有的自旋锁时,它会在一个循环中不断地检查锁是否已经被释放,这就是所谓的"自旋"。这种机制的优点是避免了线程切换的开销,但是如果锁被持有的时间过长,会导致其他等待获取锁的线程浪费 CPU 时间。

spinlock 使用示例

使用自旋锁的一般步骤如下:

  1. 初始化自旋锁。可以使用__spin_initlock函数来初始化一个自旋锁。
struct spinlock lock;
__spin_initlock(&lock, "my_lock");
  1. 在访问共享资源之前,使用spin_lock函数来获取自旋锁。如果锁已经被其他线程持有,那么spin_lock函数会一直等待,直到锁变为可用。
spin_lock(&lock);
  1. 访问共享资源。在这个阶段,其他试图获取锁的线程会被阻塞,直到你释放锁。
// access shared resources
  1. 在访问完共享资源之后,使用spin_unlock函数来释放自旋锁。这会使得其他等待获取锁的线程可以继续执行。
spin_unlock(&lock);

这就是自旋锁的基本使用方法。需要注意的是,自旋锁不应该被持有的时间过长,否则会导致其他等待获取锁的线程浪费 CPU 时间。

spinlock 结构体设计

spinlock结构体就是一个自旋锁的实现。它有一个locked字段,用于表示锁是否被持有。当locked的值为 0 时,表示锁是可用的;当locked的值为 1 时,表示锁已经被某个线程持有。

这段代码定义了一个名为spinlock的结构体,它用于实现自旋锁的功能。

// Mutual exclusion lock.
struct spinlock {
	unsigned locked;       // Is the lock held?

#ifdef DEBUG_SPINLOCK
	// For debugging:
	char *name;            // Name of lock.
	struct CpuInfo *cpu;   // The CPU holding the lock.
	uintptr_t pcs[10];     // The call stack (an array of program counters)
	                       // that locked the lock.
#endif
};

在这个结构体中:

  • unsigned locked:这是一个无符号整数,用于表示锁是否被持有。如果locked的值为 0,那么表示锁是可用的;如果locked的值为 1,那么表示锁已经被某个线程持有。

DEBUG_SPINLOCK被定义的情况下,spinlock结构体还包含以下字段,这些字段主要用于调试:

  • char *name:这是一个指向字符的指针,用于存储锁的名称。

  • struct CpuInfo *cpu:这是一个指向CpuInfo结构体的指针,用于表示当前持有锁的 CPU。

  • uintptr_t pcs[10]:这是一个uintptr_t类型的数组,用于存储调用栈。当锁被锁定时,会记录下当前的程序计数器(Program Counter,PC)的值,也就是锁定锁的那个函数的地址。这对于调试是非常有用的,因为它可以让我们知道是哪个函数锁定了锁。

如何获取锁?

这段代码是实现自旋锁的spin_lock函数。即如何获取 lock 的实现细节。

// Acquire the lock.
// Loops (spins) until the lock is acquired.
// Holding a lock for a long time may cause
// other CPUs to waste time spinning to acquire it.
void
spin_lock(struct spinlock *lk)
{
#ifdef DEBUG_SPINLOCK
	if (holding(lk))
		panic("CPU %d cannot acquire %s: already holding", cpunum(), lk->name);
#endif

	// The xchg is atomic.
	// It also serializes, so that reads after acquire are not
	// reordered before it.
	while (xchg(&lk->locked, 1) != 0)
		asm volatile ("pause");

	// Record info about lock acquisition for debugging.
#ifdef DEBUG_SPINLOCK
	lk->cpu = thiscpu;
	get_caller_pcs(lk->pcs);
#endif
}

在这段代码中:

  1. if (holding(lk)) panic("CPU %d cannot acquire %s: already holding", cpunum(), lk->name);:这行代码检查当前 CPU 是否已经持有了这个锁。如果已经持有,那么就会触发一个 panic,因为这是一个错误的使用情况。一个 CPU 不应该尝试获取它已经持有的锁。

  2. while (xchg(&lk->locked, 1) != 0) asm volatile ("pause");:这行代码是获取锁的主要部分。xchg 指令是一种原子交换指令,它可以在多处理器环境中实现同步,它将lk->locked的值设置为 1,并返回原来的值。如果原来的值为 0,那么表示锁是可用的,这个 CPU 就成功地获取了锁。如果原来的值为 1,那么表示锁已经被其他 CPU 持有,这个 CPU 就需要等待,直到锁变为可用。在等待的过程中,CPU 执行了一个pause指令,这可以避免 CPU 的忙等待,减少资源的浪费。

  3. lk->cpu = thiscpu; get_caller_pcs(lk->pcs);:这两行代码记录了一些关于锁获取的调试信息,包括当前持有锁的 CPU 和获取锁时的调用栈。

总的来说,这段代码的目的是获取一个自旋锁。如果锁已经被其他 CPU 持有,那么当前 CPU 会等待,直到锁变为可用。这是一种简单但有效的同步机制,可以保护共享资源在多处理器环境中的并发访问。

如何判断当前 CPU 是否已经持有了这个锁?

在 DEBUG 模式下可以通过下面的函数判断是否持有锁。原因是 struct spinlock 中保留了当前 cpu 的 id ,可以直接同当前 cpu 比较来判断是否持有。

// Check whether this CPU is holding the lock.
static int
holding(struct spinlock *lock)
{
	return lock->locked && lock->cpu == thiscpu;
}

在这段代码中:

  • struct spinlock *lock:这是一个指向spinlock结构体的指针,表示要检查的自旋锁。

  • return lock->locked && lock->cpu == thiscpu;:这行代码返回一个布尔值,表示当前的 CPU 是否持有给定的自旋锁。如果lock->locked的值为 1,并且lock->cpu的值等于当前的 CPU,那么表示当前的 CPU 持有这个自旋锁,函数返回true;否则,函数返回false

总的来说,这段代码的目的是检查当前的 CPU 是否持有给定的自旋锁。这在多处理器环境中是非常有用的,因为我们需要确保在任何时候,只有一个 CPU 可以持有一个给定的自旋锁,以保护共享资源的并发访问。

为什么 PAUSE 指令可以避免 CPU 忙等

在多处理器环境中,当一个线程尝试获取一个已经被其他线程持有的自旋锁时,它会在一个循环中不断地检查锁是否已经被释放,这就是所谓的"自旋"。这种情况下,CPU 处于忙等待状态,即它在等待锁释放的过程中,CPU 并没有做其他的有用工作,而是在消耗 CPU 时间。

pause指令是 Intel 提供的一种优化忙等待的手段。当 CPU 执行pause指令时,它会暂时停止执行,让出 CPU 给其他的线程或进程,从而减少资源的浪费。这是因为,如果 CPU 一直在忙等待,那么它就会占用大量的 CPU 时间,而这些 CPU 时间本可以用来执行其他的有用工作。

另外,pause指令还可以提高多线程程序的性能。因为在多线程环境中,线程之间的同步是非常重要的。如果一个线程在等待锁释放的过程中一直占用 CPU,那么其他需要运行的线程就无法获取 CPU,这会导致程序的性能下降。而pause指令可以让出 CPU,使得其他线程有机会运行,从而提高程序的整体性能。

总的来说,pause指令的目的是优化忙等待,减少资源的浪费,并提高多线程程序的性能。

如何释放锁?

下面这段代码用于释放一个自旋锁。

// Release the lock.
void
spin_unlock(struct spinlock *lk)
{
#ifdef DEBUG_SPINLOCK
	if (!holding(lk)) {
		int i;
		uint32_t pcs[10];
		// Nab the acquiring EIP chain before it gets released
		memmove(pcs, lk->pcs, sizeof pcs);
		cprintf("CPU %d cannot release %s: held by CPU %d\nAcquired at:",
			cpunum(), lk->name, lk->cpu->cpu_id);
		for (i = 0; i < 10 && pcs[i]; i++) {
			struct Eipdebuginfo info;
			if (debuginfo_eip(pcs[i], &info) >= 0)
				cprintf("  %08x %s:%d: %.*s+%x\n", pcs[i],
					info.eip_file, info.eip_line,
					info.eip_fn_namelen, info.eip_fn_name,
					pcs[i] - info.eip_fn_addr);
			else
				cprintf("  %08x\n", pcs[i]);
		}
		panic("spin_unlock");
	}

	lk->pcs[0] = 0;
	lk->cpu = 0;
#endif

	// The xchg instruction is atomic (i.e. uses the "lock" prefix) with
	// respect to any other instruction which references the same memory.
	// x86 CPUs will not reorder loads/stores across locked instructions
	// (vol 3, 8.2.2). Because xchg() is implemented using asm volatile,
	// gcc will not reorder C statements across the xchg.
	xchg(&lk->locked, 0);
}

在这段代码中 xchg(&lk->locked, 0);:这行代码将lk->locked的值设置为 0,表示锁已经被释放。xchg是一个原子操作,它可以确保在多处理器环境中,锁的释放操作是原子的,不会被其他的 CPU 打断。

总的来说,这段代码的目的是释放一个自旋锁。如果当前的 CPU 不持有这个锁,那么就会触发一个 panic,表示发生了一个严重的错误。如果当前的 CPU 持有这个锁,那么就会将锁的状态设置为已释放,然后返回。

总结

自旋锁是一种同步机制,用于在多处理器环境中保护共享资源。当一个线程尝试获取一个已被其他线程持有的自旋锁时,它会在一个循环中不断检查锁是否已被释放,避免线程切换的开销。优点是避免了线程切换,但如果锁被持有时间过长,会导致其他等待线程浪费 CPU 时间。

自旋锁的基本使用包括初始化、获取、访问共享资源和释放。spinlock结构体包含locked字段表示锁状态,以及调试信息字段。获取锁的实现使用原子交换指令和pause指令,避免 CPU 忙等。判断当前 CPU 是否持有锁通过比较 CPU ID。释放锁通过原子操作将锁状态置为 0 实现。

释放锁的代码还包含调试信息输出,用于追踪锁的获取和释放情况。pause指令的作用是减少忙等待的资源浪费,提高多线程程序性能。