1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
| /**
* queued_spin_lock_slowpath - acquire the queued spinlock
* @lock: Pointer to queued spinlock structure
* @val: Current value of the queued spinlock 32-bit word
*
* val: (queue tail(bit16-31 x), pending bit(bit8 y), lock value(bit0-7 z))
*
* fast : slow : unlock
* : :
* uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
* : | ^--------.------. / :
* : v \ \ | :
* pending : (0,1,1) +--> (0,1,0) \ | :
* : | ^--' | | :
* : v | | :
* uncontended : (n,x,y) +--> (n,0,0) --' | :
* queue : | ^--' | :
* : v | :
* contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' :
* queue : ^--' :
*/
/*
queued_spin_lock 传参(lock, lock->val)
*/
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
struct mcs_spinlock *prev, *next, *node;
u32 old, tail;
int idx;
/* 32 >= 1<<14 恒不成立 */
BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
/* 默认为 false,恒不执行 */
if (pv_enabled())
goto pv_queue;
/* 默认为 false,恒不执行 */
if (virt_spin_lock(lock))
return;
/*
* Wait for in-progress pending->locked hand-overs with a bounded
* number of spins so that we guarantee forward progress. 0,1,0 -> 0,0,1
*/
/*
* 进入慢速路径时有任务在等待 spin lock,执行这段代码时间内它可能已经获取到 spin lock 了,
* 这里更新下从 lock->val 读取到的值.
*/
if (val == _Q_PENDING_VAL) { //1<<8
int cnt = _Q_PENDING_LOOPS; //1
/*
* 函数作用:死循环读取 lock->val 的值,直到 arg2 为真才返回读取的结果。
* VAL 为读取的 lock->val 的值.
* 含义为死循环读取 lock->val 的值,直到 lock->val != _Q_PENDING_VAL(即 1<<8),
* 或尝试读取的次数达到 cnt 次,这里 cnt 初始化为 1,其实就只是读取一次。
*/
val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL) || !cnt--);
}
/*
* If we observe any contention; queue.
*
* 除了 lock value(即 z)之外,还有其它位段不为 0,说明已经有任务处于 pending 等锁的状态了,
* 那么就直接 queue,在自己的 mcs 锁上自旋,不要在 lock->val 上自旋了。
*/
if (val & ~_Q_LOCKED_MASK) // val & ~0xff
goto queue;
/*
* trylock || pending
* 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
*
* 代码走到这里,说明没有任务处于 pending 状态,那么设置锁的 pending 标志,标识本任务
* 要 pending 了。
* 函数等效于 ret = lock->val; lock->val |= _Q_PENDING_VAL(1<<8); val = ret;
*/
val = queued_fetch_set_pending_acquire(lock);
/*
* If we observe contention, there is a concurrent locker.
*
* Undo and queue; our setting of PENDING might have made the
* n,0,0 -> 0,0,0 transition fail and it will now be waiting
* on @next to become !NULL.
*
* 在执行这段代码期间,若是有任务进入了 pending 状态(设置了 pending 标识或
* 在 msc lock 上 spin 了),那么也是要直接 queue 的,在自己的 msc lock 上 spin,
* 不要在 lock->val 上 spin 了。
* 若情况是其它任务在 mcs lock 上自旋了,就清除自己或上的 pending 标志。
*/
if (unlikely(val & ~_Q_LOCKED_MASK)) { //~0xff
/* Undo PENDING if we set it. */
if (!(val & _Q_PENDING_MASK)) //_Q_PENDING_MASK = 0xff<<8
/* 直接 lock->pending=0, 将 pending 位段清 0,看来 pending 位段只使用了 bit8 */
clear_pending(lock);
goto queue;
}
/*
* We're pending, wait for the owner to go away.
*
* 0,1,1 -> 0,1,0
*
* this wait loop must be a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because not all
* clear_pending_set_locked() implementations imply full
* barriers.
*
* 代码走到这里,说明目前只有一个任务正持有锁(也可能已经释放了),且没有任务等待。
* 那么就在 lock->val 上自旋等待,只有锁的 owner 退出临界区。
* 上面的这个 pending 位段为 1 还是上面本任务设置的。
*/
if (val & _Q_LOCKED_MASK) //0xff
atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK)); //【1. 在 lock->val 上 spin】
/*
* take ownership and clear the pending bit.
* 0,1,0 -> 0,0,1
*
* owner 退出临界区释放锁了,我们就获取锁,清除我们设置的 pending 标志位。
* 通过将 bit0-15 设置为 1 来实现的。
*/
clear_pending_set_locked(lock);
/* 默认不使能 CONFIG_LOCK_EVENT_COUNTS,为空 */
lockevent_inc(lock_pending);
/* 这个路径下,我们就是锁的第一继承人,获取到锁后就退出了 */
return;
/*
* End of pending bit optimistic spinning and beginning of MCS queuing.
*
* 下面就是非第一位继承人,要在自己的 msc node 节点上自旋了。
*/
queue:
/* 原生有记录进入执行慢速路径的次数 */
lockevent_inc(lock_slowpath);
pv_queue:
/* 获取当前 cpu 的 spin lock 嵌套深度 */
node = this_cpu_ptr(&qnodes[0].mcs);
idx = node->count++; //先赋值,再嵌套计数加 1
//对 tail(即 x)的 tail cpu 和 tail idx 位段进行编码
tail = encode_tail(smp_processor_id(), idx);
/*
* 4 nodes are allocated based on the assumption that there will
* not be nested NMIs taking spinlocks. That may not be true in
* some architectures even though the chance of needing more than
* 4 nodes will still be extremely unlikely. When that happens,
* we fall back to spinning on the lock directly without using
* any MCS node. This is not the most elegant solution, but is
* simple enough.
* 翻译:基于不会有嵌套 NMI 采用自旋锁的假设分配了 4 个节点。这在某
* 些架构中可能并非如此,即使需要超过 4 的可能性仍然极小。发生这种情
* 况时,我们会退回到直接在锁上自旋而不使用任何 MCS 节点。这不是最优
* 雅的解决方案,但足够简单。
* 判断了等于,说明最大只允许嵌套 3 层。这里有个统计,若为真,说明出现
* 了 nmi 中断中持 spin lock 锁的嵌套。
*/
if (unlikely(idx >= MAX_NODES)) {
lockevent_inc(lock_no_node);
/* 退回到在 lock->val 上 spin 的状态,几乎不可能进到上面 if 语句中来 */
while (!queued_spin_trylock(lock))
cpu_relax();
goto release;
}
/* 返回当前 cpu 的 qnodes[idx].mcs 节点的地址 */
node = grab_mcs_node(node, idx);
/* Keep counts of non-zero index values: */
lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
/*
* Ensure that we increment the head node->count before initialising
* the actual node. If the compiler is kind enough to reorder these
* stores, then an IRQ could overwrite our assignments.
*/
barrier();
/*不是继承者 locked 为 0, 最尾部节点指向 NULL */
node->locked = 0;
node->next = NULL;
/* 默认没有定义_GEN_PV_LOCK_SLOWPATH,是空函数 */
pv_init_node(node);
/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
* attempt the trylock once more in the hope someone let go while we
* weren't watching.
* 尝试获取锁,成功返回 1,失败返回 0
*/
if (queued_spin_trylock(lock))
goto release;
/*
* Ensure that the initialisation of @node is complete before we
* publish the updated tail via xchg_tail() and potentially link
* @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
*/
smp_wmb();
/*
* Publish the updated tail. We have already touched the queueing cacheline;
* don't bother with pending stuff.
* p,*,* -> n,*,*
*/
/*
* 等效于:ret=lock->tail; lock->tail=tail>>_Q_TAIL_OFFSET; old = ret<<_Q_TAIL_OFFSET
* 也即是将 tail 值赋值给 lock->val 的 bit16-31.
* 也就是说 lock->tail 恒指向最后一个 pending 锁的任务对应的 msc node 节点上 ###########
*/
old = xchg_tail(lock, tail);
next = NULL;
/*
* if there was a previous node; link it and wait until reaching the head of the
* waitqueue.
* 为真,说明之前已经有 mcs node 等待节点存在了。
*/
if (old & _Q_TAIL_MASK) { //bit16-bit31, 也即是 tail 的掩码
/* 获取的是 cpu 的嵌套深度 idx 为下标的节点 */
prev = decode_tail(old); //return per_cpu_ptr(&qnodes[idx].mcs, cpu)
/* Link @node into the waitqueue. 构建 msc 链表 */
WRITE_ONCE(prev->next, node);
pv_wait_node(node, prev); //空函数
/* 死循环,spin 等待,直到 node->locked 不为 0 才退出 spin */
arch_mcs_spin_lock_contended(&node->locked); //【2. 在 mcs node->locked 上 spin】
/*
* While waiting for the MCS lock, the next pointer may have
* been set by another lock waiter. We optimistically load
* the next pointer & prefetch the cacheline for writing
* to reduce latency in the upcoming MCS unlock operation.
* 翻译:在等待 MCS 锁时,下一个指针可能已被另一个锁的 waiter 设置(在另一个 cpu 上,
* 若被设置 next 就不为 NULL)。我们乐观地加载下一个指针并为写入预取缓存线,以减少
* 即将到来的 MCS 解锁操作的延迟。
*/
next = READ_ONCE(node->next);
if (next)
prefetchw(next); //cache line 预取
}
/*
* we're at the head of the waitqueue, wait for the owner & pending to
* go away.
*
* *,x,y -> *,0,0
*
* this wait loop must use a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*
* The PV pv_wait_head_or_lock function, if active, will acquire
* the lock and return a non-zero value. So we have to skip the
* atomic_cond_read_acquire() call. As the next PV queue head hasn't
* been designated yet, there is no way for the locked value to become
* _Q_SLOW_VAL. So both the set_locked() and the
* atomic_cmpxchg_relaxed() calls will be safe.
*
* If PV isn't active, 0 will be returned instead.
*
*/
/* 没有使能,函数直接返回 0 */
if ((val = pv_wait_head_or_lock(lock, node)))
goto locked;
/* 代码走到这里,当前任务就是锁的第一继承人了 */
/*
* 死循环等待,直到 lock->val 的 y(pending bit)和 z(locked byte)都是 0, 并返回 lock->val
* spin_unlock 时直接将 lock->val 赋值为 0, 判断 pending bit 是为还没进入 msc spin 的尝试
* 持锁的任务让路的应该。
*/
val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK)); //【3. 在 lock->val 上 spin】
locked:
/*
* claim the lock:
*
* n,0,0 -> 0,0,1 : lock, uncontended
* *,*,0 -> *,*,1 : lock, contended
*
* If the queue head is the only one in the queue (lock value == tail)
* and nobody is pending, clear the tail code and grab the lock.
* Otherwise, we only need to grab the lock.
*/
/*
* In the PV case we might already have _Q_LOCKED_VAL set, because
* of lock stealing; therefore we must also allow:
*
* n,0,1 -> 0,0,1
*
* Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
* above wait condition, therefore any concurrent setting of
* PENDING will make the uncontended transition fail.
*/
/*
* 走到这里,val 的 lock 和 pending 是为 0 的,但是 tail 中可能还保存着其它等待者的信息。
* lock->tail 指向最后一个等待获取锁的任务的 mcs node 节点,若和自己相等,说明自己
* 是唯一一个 mcs node 节点。
*/
if ((val & _Q_TAIL_MASK) == tail) {
/*
* if (lock->val == val) {lock->val = _Q_LOCKED_VAL; return true;} 持锁退出
* if (lock->val != val) {val = lock->val; return false;} //没持锁
*/
if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
goto release; /* No contention */
}
/* 下面就是还有新的任务等待了,有新 msc node 串联在链表上了 */
/*
* Either somebody is queued behind us or _Q_PENDING_VAL got set
* which will then detect the remaining tail and queue behind us
* ensuring we'll see a @next.
*/
/* 直接赋值 lock->locked = _Q_LOCKED_VAL,即对 bit0-7 locked byte 进行赋值,已经持锁了 */
set_locked(lock);
/*
* contended path; wait for next if not observed yet, release.
* 等待已经更新 lock->tail 的新的 waiter 挂在 msc 链表上
*/
if (!next)
/* 死循环等待,直到 node->next 不为 NULL,通过不会等待很多次 */
next = smp_cond_load_relaxed(&node->next, (VAL)); //【4. 在 mcs node->next 上 spin】
/*
* 直接 next->locked = 1, 将下一个 mcs 节点设为第一继承人,
* 其就会退出在 node->locked 上的自旋,改为在 lock->val 上的自旋
*/
arch_mcs_spin_unlock_contended(&next->locked);
/*没有使能,为空 */
pv_kick_node(lock, next);
/* 下面就是获取到 spin_lock 锁后的逻辑了 */
release:
/* release the node. qnodes[0].mcs.count - 1 应该是和前面的加 1 相对应的 */
__this_cpu_dec(qnodes[0].mcs.count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);
|