Uthread: switching between threads 思路 xv6 已经实现了进程的切换机制,本实验要求参考进程的切换,实现一个用户态线程的切换。
要实现线程切换,必然涉及上下文,即寄存器的保存和恢复,那么需要保存哪些寄存器?实际上,只需要保存被调用者保存寄存器(callee-saved registers),而实现调用者保存寄存器(caller-saved registers)的保存与恢复的代码由编译器自动生成。关于调用者保存与被调用者保存寄存器有哪些可以参照下述 RISC-V 的 calling convention:
另外,根据 user/uthread_switch.S 的注释,thread_switch 最后通过 ret 指令将当前程序计数器的值切换为 ra 寄存器中存储的地址,实现进程的“切换”,因此 struct thread 中还需要保存每个线程对应程序的起始地址(即函数指针)。
在了解需要保存哪些寄存器之后以及如何进行线程切换之后,还有一个细节需要考虑,即栈指针寄存器(sp)的初始化。线程栈的存储位置为 struct thread 中的 stack 数组,那么 sp 应该指向 stack 的位置,但由于栈的地址从大到小增长,因此 sp 应该初始化为 (uint64)t->stack + STACK_SIZE.
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 @@ -12,6 +12,20 @@ struct thread { + /* 0 */ uint64 ra; + /* 8 */ uint64 sp; + /* 16 */ uint64 s0; + /* 24 */ uint64 s1; + /* 32 */ uint64 s2; + /* 40 */ uint64 s3; + /* 48 */ uint64 s4; + /* 56 */ uint64 s5; + /* 64 */ uint64 s6; + /* 72 */ uint64 s7; + /* 80 */ uint64 s8; + /* 88 */ uint64 s9; + /* 96 */ uint64 s10; + /* 104 */ uint64 s11; char stack[STACK_SIZE]; /* the thread's stack */ int state; /* FREE, RUNNING, RUNNABLE */ }; @@ -62,6 +76,7 @@ thread_schedule(void) * Invoke thread_switch to switch from t to next_thread: * thread_switch(??, ??); */ + thread_switch((uint64)t, (uint64)current_thread); } else next_thread = 0; } @@ -76,6 +91,8 @@ thread_create(void (*func)()) } t->state = RUNNABLE; // YOUR CODE HERE + t->ra = (uint64)func; + t->sp = (uint64)t->stack + STACK_SIZE; } void @@ -7,5 +7,34 @@ .globl thread_switch thread_switch: /* YOUR CODE HERE */ + sd ra, 0(a0) + sd sp, 8(a0) + sd s0, 16(a0) + sd s1, 24(a0) + sd s2, 32(a0) + sd s3, 40(a0) + sd s4, 48(a0) + sd s5, 56(a0) + sd s6, 64(a0) + sd s7, 72(a0) + sd s8, 80(a0) + sd s9, 88(a0) + sd s10, 96(a0) + sd s11, 104(a0) + + ld ra, 0(a1) + ld sp, 8(a1) + ld s0, 16(a1) + ld s1, 24(a1) + ld s2, 32(a1) + ld s3, 40(a1) + ld s4, 48(a1) + ld s5, 56(a1) + ld s6, 64(a1) + ld s7, 72(a1) + ld s8, 80(a1) + ld s9, 88(a1) + ld s10, 96(a1) + ld s11, 104(a1) ret /* return to ra */
Using threads 思路 后两个实验与 xv6 无关,而是练习使用 POSIX 线程库在实际的 Linux 平台进行并发编程。
本实验要求使用锁机制,实现一个支持并发的哈希表。首先需要确定的是:哪部分的操作会出现竞态(race condition)?根据观察不难得知 put() 操作可能存在下面这种情况:
线程 1 和线程 2 本次 put() 映射到一个桶中(i 相同),都执行完 line 46 ~ 49 的循环之后,e 都为 0,随后先后执行 insert(),都创建一个新的 entry,并先后更新 table[i] 的值,导致先插入的键被覆盖。 像这样,在一次插入操作未完成的情况下,另一次插入也开始进行且映射到一个桶中,就会导致丢键(keys missing)的情况发生。
首先最简单无脑的办法就是给整个 put() 函数加一把大锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 pthread_mutex_lock(&lock); for (e = table[i]; e != 0 ; e = e->next) { if (e->key == key) break ; } if (e){ e->value = value; } else { insert(key, value, &table[i], table[i]); } pthread_mutex_unlock(&lock);
可以看到,keys missing 的问题已经被解决,但是大锁带来的就是更低的性能,实际上根据上图可知,该实现在双核情况下的运行速度甚至慢于单核。
实际上,对 table 数组的遍历并不会导致竞态,因此将加锁的操作延迟到遍历结束后:
1 2 3 4 5 6 7 8 9 10 11 12 13 for (e = table[i]; e != 0 ; e = e->next) { if (e->key == key) break ; } pthread_mutex_lock(&lock); if (e){ e->value = value; } else { insert(key, value, &table[i], table[i]); } pthread_mutex_unlock(&lock);
做了上述修改后,仍然没有出现 key missing 的情况,同时效率提升了一倍以上。
最后,更细化一些,只有当两个 put() 映射到同一个桶时才会发生竞态,因此可以为每个桶分别设置一把锁,以进一步提高并发性:
1 2 3 4 5 6 7 8 9 10 11 12 13 for (e = table[i]; e != 0 ; e = e->next) { if (e->key == key) break ; } pthread_mutex_lock(&locks[i]); if (e){ e->value = value; } else { insert(key, value, &table[i], table[i]); } pthread_mutex_unlock(&locks[i]);
可见,效率又有进一步提升。
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @@ -17,6 +17,7 @@ struct entry *table[NBUCKET]; int keys[NKEYS]; int nthread = 1; +pthread_mutex_t locks[NBUCKET]; double now() @@ -47,6 +48,7 @@ void put(int key, int value) if (e->key == key) break; } + pthread_mutex_lock(&locks[i]); if(e){ // update the existing key. e->value = value; @@ -54,7 +56,7 @@ void put(int key, int value) // the new is new. insert(key, value, &table[i], table[i]); } - + pthread_mutex_unlock(&locks[i]); } static struct entry* @@ -118,6 +120,10 @@ main(int argc, char *argv[]) keys[i] = random(); } + + for (int i = 0; i < NBUCKET; ++i) { + pthread_mutex_init(&locks[i], NULL); + } // // first the puts //
Barrier 思路 最后一个实验主要是熟悉 POSIX 线程库中条件变量(conditional variable)的使用,实现的思路比较简单:前 nthread - 1 个线程在条件变量上休眠,最后一个线程将休眠的所有进程进行唤醒。有关条件变量的用法可以参考 OSTEP:OSTEP: Condition Variables .
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @@ -30,7 +30,18 @@ barrier() // Block until all threads have called barrier() and // then increment bstate.round. // + + pthread_mutex_lock(&bstate.barrier_mutex); + ++bstate.nthread; + if (bstate.nthread == nthread) { + pthread_cond_broadcast(&bstate.barrier_cond); + ++bstate.round; + bstate.nthread = 0; + } + else { + pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex); + } + pthread_mutex_unlock(&bstate.barrier_mutex); }