0%

时间序列相似度度量标准实现

预备知识:样本相似度度量标准

基础类型

距离型

以差值为基础的相似度计算方法,通常值越小越相似。

例子1: 欧式距离

假设时间序列 $X={x_1,x_2……x_n},Y={y_1,y_2……y_n}$,则可以定义它们之间的欧式距离:

$d_{L^p}=(\sum\limits_{k=1}^{n}\left\vert x_k-y_k \right\vert^p)^{\frac 1p}$

当 $p=1$ 时,距离为 $d_{L^1}=(\sum\limits_{k=1}^{n}\left\vert x_k-y_k \right\vert)$,也称为曼哈顿距离。

当 $p=2$ 时,距离为 $d_{L^2}=(\sum\limits_{k=1}^{n}\left\vert x_k-y_k \right\vert^2)^{\frac 12}$,也称为欧几里得距离

当 $p\rightarrow \infin$ 时,距离为 $d_{L^\infin}=\max\limits_{1\le k \le n}\left\vert x_k-y_k \right\vert$

例子2:马氏距离(可以看成欧式距离使用协方差矩阵达成的标准化形式)

两个向量的马氏距离为:

$$d(\vec x,\vec y)=\sqrt{(\vec x-\vec y)^{T}\sum^{-1}(\vec x-\vec y)}$$

其中,$\sum$ 为 $\vec x$ 和 $\vec y$ 的协方差矩阵。

为什么要用马氏距离?欧式距离不行吗?考虑下面这个例子:

image-20200602210843484.png

上图中,红圆圈是数据点的均值,绿色的×和红色的×到均值的欧式距离是相等的,但我们可以明显地看到,绿色的×应该属于这个分布,红色的×不属于。这是因为

  1. 不同维度(x轴,y轴)之间存在相关性(在这里表现为x轴与y轴存在正相关)。

  2. 不同方向上变化幅度不同,把图顺时针旋转45度,可以看到横向上变化较大,竖向上变化较小(有些资料把这一点称为量纲不同和方差不同)。

用马氏距离就可以消除这种差别,它相当于把欧式距离进行了标准化。

夹角型

以乘积为基础的相似度计算方法,通常值越大越相似。可以通过取相反数达成值越小越相似。

例子1:夹角余弦

假设时间序列 $X={x_1,x_2……x_n},Y={y_1,y_2……y_n}$,则可以定义它们之间的夹角 $\theta$ 满足:

$cos\theta=\frac{\sum\limits_{k=1}^{n}x_ky_k}{\sqrt{\sum\limits_{k=1}^{n}x_k^2}\sqrt{\sum\limits_{k=1}^{n}y_k^2}}$

例子2:相关系数(其实也可以看成夹角余弦通过每个特征减均值达成的标准化形式)

假设时间序列 $X={x_1,x_2……x_n},Y={y_1,y_2……y_n}$,则可以定义它们之间的Pearson相关系数 $cor(X,Y)$ 满足:

$cor(X,Y)=\frac{\sum\limits_{k=1}^{n}(x_k-\bar x)(y_k-\bar y)}{\sqrt{\sum\limits_{k=1}^{n}(x_k-\bar x)^2}\sqrt{\sum\limits_{k=1}^{n}(x_k-\bar x)^2}}$

针对特殊结构数据

时间序列

核心需要:对齐

什么是对齐:样本特征的互相配对。

为什么要对齐:样本的相似度是根据样本特征的相似度累加/平均的,结构化数据的特征直接对齐了,但时间序列的每一时刻值可能没有天然强对齐关系。

得到样本相似度度量的实际处理:不同对齐策略下的相似度度量最值

不同对齐策略指对对齐的搜索空间的不同约束,搜索空间的大小会导致不同的复杂度。

时间序列对齐策略的潜在需求:

保序性(单调性):例如T1:a, b, c; T2:x, y, z,无论怎么配对,T1配对出的匹配序列T2‘顺序满足y不会在x前面,z不会再x,y前面。

保端点型对齐策略

可以一对多,需要全部点(时间序列各个时刻的值)都找到配对(从而开始点一定配对开始点,结束点一定配对结束点,也满足连续性)

比较适合的场景:语音相似度识别

典型例子:DTW([Dynamic Time Warping,动态时间规整):保端点型对齐策略下的距离型度量

img

上图就是DTW对齐策略的一个例子。

最基本的DTW可以用动态规划(dp)来实现。具体的,假设时间序列 $X={x_1,x_2……x_m},Y={y_1,y_2……y_n}$ ,用 $dp[i][j] ,i\le m,j\le n$ 来表示 $X$ 的前 $i$ 个点和 $Y$ 的前 $j$ 个点能取得的最小距离,定义状态转移方程为:

​ $$dp[i][j]=\begin{cases} dist(1,1),i=1,j=1 \ dp[i-1][j]+dist(i,1),i>1,j=1 \ dp[i][j-1]+dist(1,j),i=1,j>1\min(dp[i-1][j-1],dp[i][j-1],dp[i-1][j-1])+dist(i,j),i>1,j>1 \end{cases}$$

其中 $dist(i,j)=(X[i]-Y[j])^2$ 。

为了将结果可视化,可以将距离矩阵(即 dist )绘制出来,如 $X=[3,6,8],Y=[2,4,8,7]$ 时,距离矩阵如下所示:

image-20200604221146644.png

要达到一个最小距离,相当于从左上角走到右下角,且每次只允许往右、往下或者往右下走,使得走过的数字的和最小。显然在这个图里应该这样走:

image-20200604221539941.png

DTW的优点

DTW能处理长度不同的时间序列,这是它比传统的距离算法强的地方。而且对很多时间序列,用DTW进行距离计算明显更合理。

DTW的缺点

用距离来度量相似度的算法的时间复杂度是 $O(n)$ ,空间复杂度是 $O(1)$ 的 ,而DTW算法在时间和空间上都是 $O(nm)$ 的。对于较长的时间序列,直接进行计算显然开销太大,于是要引进一些优化的方法。

以下的变种是基于缩减搜索空间得到的变种。

DTW变种1:greedy-DTW

来自论文《Time2Graph: Revisiting Time Series Modeling with Dynamic Shapelets》的附录A.1的Algorithm 5

这种算法是用贪心的思想,即每次选择所有能走的点里代价最小的点。为了避免出现沿着边缘走的情况,设置一个窗口 $w$ ,规定走到的位置 $x,y$ 满足 $|x-y|\le w$ 。比如说,对时间序列 $X=[2,3,5,7,1],Y=[4,2,8,6,4]$,dist矩阵和走过的路线是这样子的:

image-20200605171339497.png

greedy-DTW的优点

它最大的优点是速度快,只需要 $O(n+m)$ 的时间复杂度就可以完成计算。

greedy-DTW的缺点

缺点也很明显,贪心算法无法保证取得全局最优解,容易”误入歧途”。

DTW变种2:Fast-DTW

来自论文《FastDTW: Toward Accurate Dynamic Time Warping in Linear Time and Space》

可参考https://www.cnblogs.com/kemaswill/archive/2013/04/18/3029078.html

这种方法的基本策略是递归。FastDTW 算法如下:

对于两个时间序列 $X=[1,2,3,4],Y=[3,4,5,6]$,首先将它们的长度缩短为一半,变成 $X’=[1.5,3.5],Y’=[3.5,5.5]$ ,相对应的,矩阵从 4×4 变成了 2×2,如下图所示:

image-20200606011018301.png

然后,对压缩后的时间序列递归地应用 FastDTW找到一条路径(由于这里的时间序列长度只有2,就可以直接找到不用递归),如下所示:

image-20200606011233288.png

在原来的矩阵中给这条路径经过的块打上标记,如下所示:

image-20200606011749696.png

为了增大搜索范围,对路径以 $r$ 为半径拓展,即将与路径的距离小于等于 $r$ 的路径打上标记。$r=1$ 时,如下所示:

image-20200606012002821.png

这样,就得到了我们的搜索路径,然后只用在这条路径里找就可以了。看起来,搜索路径并不比原来的矩阵小多少,但当时间序列长度足够大时,搜索路径会很小,因为它是 $O(n*r)$ 的,证明略。

Fast-DTW的优点

最大的优点显然是速度快,如下图所示(顺便吐槽一句,04年的时候执行一个 $n=1000$ 的 $O(n^2)$ 算法竟然要0.92秒,而今天的计算机可能不用0.01秒,看来计算机领域的发展还是很快的):

image-20200606012512919.png

image-20200606012619988.png

对于 $N=1000$ 左右的时间序列,用 $r=20$ 的 Fast-DTW 会比 DTW 快了 4-5倍。当 $N$ 不断增大时,加速效果还会更明显。

Fast-DTW的缺点

由于这种方法得到的是较优解,因此我们有必要关注它的误差。定义误差如下所示:

image-20200606013024726.png

经过试验得到误差大小:

image-20200606013005315.png

可以看到,当 $r=20$ 时,平均误差已经小于 $1%$ 。虽然误差仍然存在,但已经基本可以忽略。

还有一个缺点是编程难度较DTW大。

DTW变种3:constrained DTW

来自论文《Searching and mining trillions of time series subsequences under dynamic time warping》

这篇文章对几种已有的剪枝策略进行了讨论,对它们的下界和时间复杂度进行了比较进行了比较,如下图所示:

image-20200606141816713.png

一般认为在虚线下方的剪枝策略是没用的,因为至少存在一种下界更高且时间复杂度更低的策略。

最终作者决定用这种策略:先用 $LB_{Kim}FL$ ,如果不能大于 $Best_so_far$ ,则再用 $LB_{Keogh}EQ$ ,还是不行就用 $LB_{Keogh}EC$,再不行就用 $Early_abandoning_DTW$ 。

至于cDTW,似乎这篇论文并没有提到,查了其他的资料,大概就是把DTW的路径限制在对角线旁边的样子。

平移型对齐策略

只可一对一,允许两边边界的点(端点及端点附近点)找不到配对,满足连续性。

比较适合的场景:时钟不共享而有一定偏差的两条时间序列

典型例子:cross-correlation:平移型对齐策略下的夹角型度量

可参考论文<<k-Shape: Efficient and Accurate Clustering of Time Series>>的3.1节的介绍

cross-correlation的定义与卷积类似,对于函数 $f(x)$ 和 $g(x)$ ,它们的 cross-correlation 定义为

$$h(x)=\int_{-\infin}^{+\infin}f(t)g(x+t)dt$$。事实上,把 + 号改为 - 号就是卷积了。两个已经标准化了的时间序列 $\vec x=(x_1,…,x_m)$ 和 $\vec y=(y_1,…,y_m)$ 的 cross-correlation 可以这样定义:$CC(\vec x,\vec y)=(c_1,c_2,…,c_{2*m-1})$

其中:$c_{k+m}=\begin{cases}\sum\limits_{l=1}^{m-k}x_{l+k}y_l,k>0\ \vec x · \vec y ,k=0 \ \sum\limits_{l=1-k}^{m}x_{l+k}y_l,k<0 \end{cases}$

然后在 $CC(\vec x,\vec y)$ 中找到一个最大的,记为 $CC_w(\vec x,\vec y)$,定义 $NCC_c(\vec x,\vec y)=\frac{CC_w(\vec x,\vec y)}{\sqrt{R_0(\vec x,\vec x)·R_0(\vec y,\vec y)}}$。这样就把距离归化到 $[-1,1]$。再定义 $SBD(\vec x,\vec y)=1-NCC_c(\vec x,\vec y)$ ,则得到 向量 $\vec x$ 和向量 $\vec y$ 的 SBD(Shape-based distance)距离,对于形状接近的时间序列,这个值接近于0,否则接近于2。

值得注意的是,直接计算两个长为 $m$ 的时间序列的SBD距离是 $O(m^2)$ 的,但我们知道两个序列的卷积可以 $O(mlogm)$ 地计算,而)卷积只用把 $\vec x$ 翻转一下就变成了 $CC(\vec x,\vec y)$,因此,计算 SBD 距离也是 $O(mlogm)$ 的。这会比 $O(m^2)$ 的 DTW 算法快很多。

以下的变种是基于不同标准化思路得到的变种。

<<k-Shape: Efficient and Accurate Clustering of Time Series>>的3.1节的公式8的3个变种

定义 $NCC_b(\vec x,\vec y)=\frac{CC_w(\vec x,\vec y)}{m}$ 这样的结果并不会归一化到一个区间,有啥规律我也看不出来…

定义 $NCC_u(\vec x,\vec y)=\frac{CC_w(\vec x,\vec y)}{w-|w-m|}$,这样的结果是上面有多少项相乘,则结果就除以多少。

并行与分布式计算作业3

题目要求

​ 利用LLVM (C、C++)或者Soot (Java)等工具检测多线程程序中潜在的数据竞争以及是否存在不可重入函数,给出案例程序并提交分析报告。

概述

​ 用pthreads编写一个简单的并行程序,用附带了ThreadSanitizer的clang对代码进行编译,在终端运行,查看数据竞争情况。再根据ThreadSanitizer的输出结果对代码进行修改,通过忙等待、互斥量和信号量等方法对临界区加锁,消除数据竞争。

​ 编写一个检测程序中是否存在不可重入函数的程序,它读取一个IR文件,判断其中的每一个函数是否是不可重入的。

检测数据竞争

​ 根据
$$\pi=4(\sum_{n=1}^{\infty}(-1)^{n-1}\frac{1}{2n-1})$$

​ 编写一个计算$\pi$值的并行程序,代码见code1.c:

​ 通过命令行参数得到线程数量和要计算的范围。在第33行创建多个线程,每个线程负责计算一部分内容,把结果累加到全局变量sum上。由于多个线程是并行执行的,且会对sum进行修改,所以会出现问题。不用ThreadSanitizer时,程序运行结果如下:

image-20200424185518501.png

​ 可以看到,每次运行结果都不一样,有时偏大有时偏小,说明数据竞争现象确实存在。

​ 用ThreadSanitizer检测,结果如下:

image-20200424190726679.png

​ 可以看到,运行结果显示存在data race,在第21行,刚好就是:

1
sum+=factor/(2*i+1);

​ 一个很自然的想法是,给每个线程创建一个局部变量,把要计算的值加到这个局部变量上,最后再对所有线程求一次和。只需将Thread_sum函数修改为这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
my_sum+=factor/(2*i+1);
}
sum+=my_sum;
return NULL;
}

​ 这样看似正常,但运行后发现,第24行仍然是多个线程同时修改一个全局变量,仍然存在数据竞争现象:

image-20200424194846838.png

解决数据竞争

​ 尝试用忙等待、互斥量和信号量来解决数据竞争问题。

忙等待

​ 忙等待就是设置一个标记变量,用于指明临界区可以被哪个线程执行,该执行完之后修改这个标记变量,而不能执行临界区的线程必须一直处于忙等待状态。

​ 这只用定义一个flag,再对上面的第24行修改为:

1
2
3
while(flag!=my_rank);
sum+=my_sum;
flag=(flag+1)%thread_count;

​ 完整代码保存在code2.c

​ ThreadSanitizer似乎不支持用忙等待的方法对临界区加锁:

image-20200424221456398.png

互斥量

​ 互斥量能保证同一时间只有一个线程在执行临界区的代码,其他线程只能等待。等当前线程执行完之后,会从等待的线程中随机选一个进入临界区。

​ 这只用定义一个互斥量mutex,再将临界区代码修改为:

1
2
3
pthread_mutex_lock(&mutex);
sum+=my_sum;
pthread_mutex_unlock(&mutex);

​ 完整代码保存在code3.c

​ 用ThreadSanitizer测试,结果正常,没有warning:
image-20200424220329505.png

信号量

​ 信号量(semaphore)可以认为是一种特殊类型的unsigned int,它的类型是sem_t。信号量可以赋值为0,1,……。大多数情况下,我们只用0和1两个值。这种只有0和1值的信号量也称为二元信号量。0对应上了锁的互斥量,1对应没上锁的互斥量。

​ 如果我们要用信号量来解决临界区问题,可以先创建一个全局信号量,初始值为1。在临界区前调用sem_wait()函数,这个函数的意义是:如果信号量为0则阻塞;如果是非0值则减1然后进入临界区。临界区执行完之后,调用sem_post()函数将信号量置为1。好像跟互斥量没啥区别

​ 这也只用定义一个信号量my_semaphore,在main函数里初始化,再将临界区代码修改为:

1
2
3
sem_wait(&my_semaphore);
sum+=my_sum;
sem_post(&my_semaphore);

​ 完整代码见code4.c

​ 测试一下,也很正常:

image-20200424221059782.png

检测不可重入函数

​ 利用clang可以生成c语言程序对应的中间表示,下面这个函数:

1
2
3
4
void C(int a){
B(1);
val=1;
}

​ 会被转化为

1
2
3
4
5
6
7
define void @C(i32) #0 {
%2 = alloca i32, align 4
store i32 %0, i32* %2, align 4
call void @B(i32 1)
store i32 1, i32* @val, align 4
ret void
}

​ 我们不关心其他的内容,只需要知道:

  1. 函数定义以“define”开始,这一行的第一个 ‘@’ 和 ‘(‘ 之间的字符串是函数的名字。

  2. 调用一个其他函数会以 “call” 开始, 这一行的第一个 ‘@’ 和 ‘(‘ 之间的字符串是被调用函数的名字。

  3. 访问一个全局变量时会用到 ‘@’ ,访问其他变量会用到 ‘%’ 。

    此外,被其他函数调用但没有在程序中定义的函数,像printf()函数等,都默认为不可重入函数。

​ 知道了这些,就可以写一个程序来检测程序中的不可重入函数了。做法很简单,把函数理解为一个点,把调用关系理解为边,A调用了B就连一条B->A的边。然后将访问了全局变量的函数和在程序中没有给出定义的函数标记为不可重入函数,从这些 “原生的” 不可重入函数开始进行深度优先搜索,凡是访问到的点都是不可重入函数。深度优先搜索的示例代码如下:

1
2
3
4
5
6
7
8
9
10
void dfs(int u){
for(int i=head[u];i;i=e[i].next){//用链式前向星的方法存边。这里是遍历u节点的每一条出边。
int v=e[i].to;
if(!vis[v]){//如果这个节点没有被访问过
vis[v]=true;
if(bkcr[v]!=-1)bkcr[v]=1;//-1代表的是"原生的" 不可重入函数,1是由于调用了不可重入函数而成为不可重入函数的函数
dfs(v);
}
}
}

​ 完整代码见code5.cpp

测试

​ 编写一个test.c,里面有几个函数调用来调用去,如下图所示:

image-20200504124549947.png

​ 用clang生成中间代码:

1
clang -O0 -S -emit-llvm ./test/test.c -o input.ll

​ 进行检测,输出如下:

image-20200504124714169.png

​ 结果符合预期。

总结

​ 用ThreadSanitizer可以方便地检测多线程中存在的数据竞争问题,检测到临界区后,可以用互斥量和信号量对它进行加锁。利用llvm可以生成程序的中间代码,然后可以检测其中是否有不可重入函数。

​ 这次实验完成的比较简单,主要是因为之前学过一点pthread,还写了篇学习笔记,于是直接把代码和博客的内容拿过来用了(抄自己的博客不算抄吧)。检测不可重入函数稍微有点复杂,不过只要对llvm的中间代码有了基本的了解,学过一些图论算法,解决起来也不算困难。

​ 一个比较大的收获是学会了使用ThreadSanitizer来检查数据竞争,大大方便了以后编写多线程程序。

操作系统实验三_操作系统内核

[toc]

实验目的

1、加深理解操作系统内核概念
2、了解操作系统开发方法
3、掌握汇编语言与高级语言混合编程的方法
4、掌握独立内核的设计与加载方法
5、加强磁盘空间管理工作

实验要求

1、知道独立内核设计的需求
2、掌握一种x86汇编语言与一种C高级语言混合编程的规定和要求
3、设计一个程序,以汇编程序为主入口模块,调用一个C语言编写的函数处理汇编模块定义的数据,然后再由汇编模块完成屏幕输出数据,将程序生成COM格式程序,在DOS或虚拟环境运行。
4、汇编语言与高级语言混合编程的方法,重写和扩展实验二的的监控程序,从引导程序分离独立,生成一个COM格式程序的独立内核。
5、再设计新的引导程序,实现独立内核的加载引导,确保内核功能不比实验二的监控程序弱,展示原有功能或加强功能可以工作。
6、编写实验报告,描述实验工作的过程和必要的细节,如截屏或录屏,以证实实验工作的真实性

实验环境

  • Windows 10
  • WSL (Windows Subsystem for Linux) [Ubuntu 18.04.2 LTS]:WSL 是以软件的形式运行在 Windows 下的 Linux 子系统,是近些年微软推出来的新工具,可以在 Windows 系统上原生运行 Linux。
  • gcc version 7.5.0:C 语言程序编译器,Ubuntu 自带。
  • ld version 2.3.0: 链接器,Ubuntu自带
  • NASM version 2.13.02:汇编程序编译器,通过sudo apt install nasm安装在 WSL 上。
  • Oracle VM VirtualBox :轻量开源的虚拟机软件,安装在Windows下。
  • VSCode - Insiders v1.33.0:好用的文本编辑器,有丰富的插件,可以用它来打开WSL中的文件夹,用它自带的终端执行make命令。
  • GNU Make 4.1:安装在 Ubuntu 下,一键编译并连接代码,生成最终的文件。
  • Bochs 2.1.1:安装在Windows下,用于调试代码。

自制工具

​ 由于我的虚拟机和Bochs都安装在Windows下,所以需要将WSL中生成的文件写入至Windows的磁盘,这可以用我编写的工具 do 来解决,只需要执行

1
./do 文件名 写入的扇区

​ 就可以了。

实验内容

概述

​ 首先了解c语言程序与汇编语言程序混合编译的方法,再开发操作系统内核,它具有两个主要功能:

  • 提供加载用户程序的方法,用户可以将程序写入磁盘,然后让操作系统执行这些用户程序。

  • 控制键盘输入和屏幕输出,使得用户可以与操作系统交互。

​ 再将上一次实验的4个程序放进磁盘,让操作系统执行它们,查看执行结果是否正确。

c与汇编混合编译

​ 由于操作系统内核非常复杂,只用汇编语言是很难完成的,因此需要使用c语言和汇编语言的混合编译,生成可执行的二进制文件。

​ 可以用gcc将一个c语言程序编译生成汇编语言文件,如对下面这个test.c:

1
2
3
4
5
6
int f(int val){
return val+1;
}
int main(){
f(3);
}

​ 执行

1
gcc -march=i386 -m16 -ffreestanding -fno-PIE -masm=intel -S test.c -o test.asm

​ 会生成一个x86格式的汇编语言文件test.asm。它的内容很复杂,但我们只用关注一些关键的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main:
.LFB1:
.cfi_startproc
push ebp;函数开始时,总要push ebp,保护这个寄存器
.cfi_def_cfa_offset 8
.cfi_offset 5, -8
mov ebp, esp
.cfi_def_cfa_register 5
push 3 ;传递参数用push,把参数压到栈里,被调用的函数就能发现
call f ;这里调用了f这个函数,用的是call指令
add esp, 4 ;由于前面的ret和pop指令,现在esp指向的值就是3,也就是刚才传的参数,所以把esp+=3,会让栈恢复到函数调用之前的状态
nop
leave
.cfi_restore 5
.cfi_def_cfa 4, 4
ret ;返回只用一条ret指令就可以了。由于我的test.c文件没有return 0,所以gcc帮我补上了一条ret指令
.cfi_endproc

​ f 函数是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
f:
.LFB0:
.cfi_startproc
push ebp ;这里也是要push ebp
.cfi_def_cfa_offset 8
.cfi_offset 5, -8
mov ebp, esp
.cfi_def_cfa_register 5
mov eax, DWORD PTR [ebp+8] ;main函数用了call指令,esp要减4;前面push 了 ebp ,esp又要减4,所以esp+8才是我们传递给f函数的参数。
inc eax ;返回一个值,可以把它放在eax里
pop ebp ;这里把ebp pop 掉,esp+=4
.cfi_restore 5
.cfi_def_cfa 4, 4
ret ;这里返回,esp+=4
.cfi_endproc

​ 看来c语言生成的汇编程序也不算神秘,除了一些奇奇怪怪的指令,跟我们写的汇编程序也没有很大差别。通过上述分析,我们对函数调用和传递参数过程有了更深入的了解。

​ 接着,我们编写一个汇编程序(msg.asm)和c程序(count.c)混合编程实例。汇编模块中定义一个字符串(为了方便,假设它以’\n’结尾),调用C语言的函数,统计其中某个字符出现的次数,汇编模块显示统计结果。

​ c程序如下:

1
2
3
4
5
6
7
8
9
10
int count(char* str){
int i=0;
for(;*str!='\n';str++){
if(*str=='e')i++;
}
return i;
}
int main(void){
count(0);
}

​ 其中,汇编程序调用c程序的函数的过程如下:

1
2
3
4
push 0
push string; string 是一个标号,占2个字节,但c语言的指针是4个字节,于是要把前两个字节置为0,就在上面多push一个0
push 0;call会压栈2个字节,但c语言中默认的是压栈4个字节,如果我们不压栈4个字节,c程序中栈的位置会错乱
call count

​ 函数返回会把返回值放在eax寄存器,然后汇编程序可以调用 10h 号中断把它显示出来。

​ 编写makefile文件(makefile2)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kernel:msg.o count.o my_mbr
ld -m elf_i386 -N --oformat binary -Ttext 0x7e00 msg.o count.o -o kernel
#ld 用于将汇编程序生成的二进制文件和c程序生成的二进制文件链接起来, -Ttest用于指定程序的起始位置为0x7e00处
./do kernel 1 # 将kernel文件写入磁盘的第一个扇区
./do my_mbr 0 # my_mbr是引导扇区程序,它会将kernel文件加载到0x7e00处
msg.o:msg.asm
nasm -felf msg.asm -o msg.o # 如果不加 -felf 参数好像就不能调用c程序中的函数
nm msg.o > tem.txt #用 nm 命令来分析符号表,输出重定向到tem.txt文件
count.o:count.asm
gcc -march=i386 -m16 -mpreferred-stack-boundary=2 -ffreestanding -fno-PIE -masm=intel -c count.c -o count.o
#这里跟上文的编译方式基本一样,不同的是 -c 参数指定生成 .o 文件
nm count.o >> tem.txt #用 nm 命令来分析符号表,输出重定向到tem.txt文件的末端
my_mbr:my_mbr.asm
nasm my_mbr.asm -o my_mbr

​ 在VSCode自带终端中输入:

1
make -f makefile2

​ 就完成了编译、链接和写入磁盘的工作,非常方便。得到的符号表如下:

1
2
3
4
5
6
0000001a t _end #因为msg.asm里将_start声明为全局变量,_end没有,所以这一行是 小写字母t 表示这是局部变量
00000000 T _start #这一行是大写字母T表示全局变量
U count
0000001c t string
00000000 T count
0000003f T main

​ 用bochs运行,结果如下:

image-20200509142152549.png

​ 运行结果正常。

开发操作系统内核

​ 理论上来说,用纯C语言开发内核也是可以的,但要用到很多内嵌汇编,会使程序看起来令人不适。为此,我把 C 程序中需要用到的大量汇编语言代码放置在entry.asm中,c程序只需要执行

1
call 标号

​ 就可以调用entry.asm 里的各种过程。

汇编程序部分

​ 这部分的内容在entry.asm这个文件中,主要分为三部分。从第6行到第13行是第一部分,主要负责将控制权交给C程序中的main函数,当main函数返回时停机。

​ 从第15行到第71行_load_program过程,用于加载用户程序。将程序从磁盘加载到内存可以调用 16h 号中断来实现,但我一调用就会出bug,于是我只能用《x86汇编语言:从实模式到保护模式》这本书里提供的代码。

​ 调用这个过程之前要将用户程序被加载到的位置放置在dx中,用户程序在磁盘中的起始扇区放置在si中,用户程序的所占扇区数放置在bx中。

​ 从第73行到第111行是clear过程,用于将屏幕清空。这个过程十分简单,不必赘述。

C程序部分

​ main.c这个文件中的内容是操作系统内核的主要部分。它包括基础I/O操作、工具函数和各种用户交互命令。

I/O 操作

​ 为了方便输入输出,我首先编写了getchar()、putchar()、getline()、put()这4个函数。

​ 第23行到第32行的内容是 getchar() 函数,它通过调用 16h 号中断来得到一个输入字符。如果这个字符的值是13(回车符的键盘码),则返回 ‘\n’ 。

​ 第34行到第86行的内容是putchar() 函数,它接受一个字符类型的参数,将字符打印到屏幕上。打印的位置由locr 、 locc 这两个全局变量来决定。

​ 当要打印的字符是一个普通的字符时,调用 10h 号中断在当前光标处输出这个字符,并调整locr 和 locc 两个变量的值;

​ 当要打印的字符是回车符时,将locc置0,将locr++,调用 10h 号中断设置光标的位置为 locc 和 locr 指定的位置;

​ 当要打印的字符是退格符时,首先修改 locc 和 locr 两个变量的值,保证这两个值始终都是下一次打印的字符在的屏幕上的位置。然后调用10h号中断修改当前光标的位置为为 locc 和 locr 指定的位置,将这个位置的值清0。

​ getline()函数接受一个字符类型的指针,不断调用getchar(),把读进来的字符存到字符串里,读到回车符就终止,给字符串加上一个 ‘\0’ 。

​ put()函数接受一个字符类型的指针,不断调用putchar()输出它,遇到 ‘\0’ 就终止。

工具函数

​ 为了方便,我编写了 int_to_str () , str_to_int () 和 strcmp () 这几个函数。意思很明显,内容也缺乏技术含量,不再赘述。

用户交互

​ 程序会不断循环,每次都用getline读取用户输入的命令,并进行交互,共有clear、help、load 和 quit 4种交互命令。

​ clear命令会将屏幕清空,这只需调用前面说过的entry.asm里的_clear过程就可以了。

​ help命令会打印提示信息。

​ quit命令会终止操作系统内核的执行。

​ load命令是最重要的命令。设计这个命令的初衷是,假设用户有一块装有我的操作系统内核的硬盘,但他完全不懂电脑,只会将程序写入硬盘。这个命令可以让他将程序加载到内存中并运行,而完全不需要修改操作系统的代码。

​ load命令会打印一条提示信息提示,提示用户输入程序在磁盘中的起始位置,再打印一条提示信息,提示用户输入程序在磁盘中占用的扇区数。然后调用 _load_program 过程将程序加载进内存0xa000处,设置es、ds寄存器的值,并将控制权交给用户程序。用户程序运行结束后,操作系统会将es、ds寄存器的值清0,并清空屏幕。

实验过程

​ 编写一个makefile文件,然后就可以在VSCode自带的终端里输入make完成大量的工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
user_programs=user_program_1 user_program_2 user_program_3 user_program_4
all: kernel my_mbr $(user_programs)
./do kernel 1 # kernel 文件是生成的操作系统内核二进制文件
./do my_mbr 0 # my_mbr 是引导扇区程序
kernel:entry.o main.o
ld -m elf_i386 -N --oformat binary -Ttext 0x7e00 entry.o main.o -o kernel
# 这里将 entry.o 和 main.o 链接成kernel文件,参数在上文介绍过了,不再赘述
nm entry.o > symbol_table.txt #分析符号表,结果在 symbol_table.txt
nm main.o >> symbol_table.txt
entry.o:entry.asm
nasm -felf $< -o $@
main.o:main.c
gcc -march=i386 -m16 -mpreferred-stack-boundary=2 -ffreestanding -fno-PIE -masm=intel -c $< -o $@ #参数在上文已经介绍过了,不再赘述
my_mbr:my_mbr.asm
nasm my_mbr.asm -o my_mbr
user_program_1:user_program_1.asm # user_program是一些用户程序,把它们分别写入到第10,20,30,40个扇区,然后可以在操作系统中加载它们
nasm $< -o $@
./do $@ 10
user_program_2:user_program_2.asm
nasm $< -o $@
./do $@ 20
user_program_3:user_program_3.asm
nasm $< -o $@
./do $@ 30
user_program_4:user_program_4.asm
nasm $< -o $@
./do $@ 40

​ 用bochs运行,依次输入load,10,1(第一个用户程序在磁盘的第10个扇区,大小为1个扇区):

image-20200509201901481.png

​ 效果拨群:

image-20200509202009328.png

​ 按下Ctrl+C,返回操作系统:

image-20200509202136463.png

​ 再依次输入load,20,2 加载第二个程序:

image-20200509205136902.png

​ 效果如下:

image-20200509205234324.png

​ 输入 load 40 4 后效果如下:

image-20200509205342547.png

​ 输入load 30 3 后效果如下:

image-20200509205446870.png

​ 输入help打印提示信息:

image-20200509205640183.png

​ 输入clear清空:

image-20200509205739629.png

image-20200510163248587.png

​ 输入quit可以退出操作系统:

image-20200509205827461.png

实验总结

​ 这次实验算得上是很硬核了,用到了很多x86汇编的知识,还要用 ld 这种完全不熟的工具,用gcc里各种奇奇怪怪的参数,甚至为了方便我还学了一下makefile (好像早就该学了吧) ,遇到的困难也有很多,主要有:

  • ​ 对汇编语言不够了解。主要是函数调用和传参那里,非常麻烦,之前从没有深入了解过。而且C语言程序编译生成的汇编代码跟自己写的在风格上有很大差别,让我很不适应。一开始的时候只能看懂一些关键的语句的意思,好在我看多了之后还是克服了心中的 “恐惧感” 。

  • ​ debug实在是太麻烦了。虽然我会使用bochs调试程序,但很多bug非常隐蔽,包括但不限于:

    1. 在键盘中输入alt+tab切换屏幕,导致后面的输入无法被bochs读入。至今我都没想到解决的办法,好在不影响我写程序。

    2. 向c语言的函数传字符串常量会出错。这个问题好像不止我一个遇到,其他的同学和网上的博客也有这样的问题。我至今也没找到原因,只能把字符串常量改为char*类型。

    3. 向指向int类型的指针传short类型变量的地址。我声明了一个short变量,调用 str_to_int (char* s,int* val) 函数的时候把它的地址传了进去,本来我觉得反正都会进行类型转换,没啥问题。可是我执行了

      1
      *val=0;

      之后,问题就来了: short类型变量是2个字节,但 val 指针是int类型的指针,上面的操作会把内存中4个字节全部置为0。更要命的是,由于第一个参数先压栈,第二个字符后压栈,val指向的后面两个字节刚好就是 s 字符串的前两个字节,于是这个函数就不会得到正确的结果。当我想到这个问题时,我不禁为这世上有如此巧妙的bug而感到震惊。

image-20200509000616841.png

image-20200509000427794.png

  1. int 16h 读键盘会导致光标位置出错。这是一个很奇怪的bug,不调用int 16h时,用int 10h 中断(ah=3)来读取光标位置可以正常运行,当我用 int 16h 读键盘输入后,用int 10h 中断(ah=03)就完全得不到正确结果,但在光标处打印字符却没有问题。我最终也没整明白这其中的缘由,只能放弃使用int 10h 中断来读取光标位置,改用 locc locr 这两个变量。

    我后来想到,debug不一定要对着bochs的那些汇编代码一行行看,用内核输入输出函数也能帮助我debug,这样稍微缓解了我的压力。

  • ​ 很多工具不会用。说出来有点丢脸,在这次实验之前我从来没用过 ld 这个工具,当我看到老师给的ppt里那一串参数时,突然认识到自己是多么不学无术。经过我反复地尝试、不断地失败后,我总算学会了如何使用ld和gcc完成C程序与汇编程序混合编译事实上只是能跑起来而已,学会是不可能学会的

​ 虽然困难很多,但收获也同样不少。通过这次实验,我大大加强了对汇编语言和C语言的了解,对操作系统的工作方式的认识也更加深入了。同时,完成操作系统内核的开发也算是一件很鼓舞人心的事情,这让我有了更多的勇气和兴趣来进一步学习更深的知识。

参考资料

  1. https://wu-kan.cn/_posts/2019-03-28-%E7%94%A8%E6%B1%87%E7%BC%96%E4%B8%8EC%E8%AF%AD%E8%A8%80%E5%BC%80%E5%8F%91%E7%8B%AC%E7%AB%8B%E5%86%85%E6%A0%B8/ (今年大三的学长,博客写的很不错)
  2. https://blog.csdn.net/a200710716/article/details/45936643 (关于键盘输入的ASCII码的资料)
  3. https://www.cnblogs.com/johnshao/archive/2011/06/13/2079638.html (int 10h 中断的详细介绍)
  4. https://blog.csdn.net/ZCMUCZX/article/details/80462394?locationNum=4&fps=1 (int 16h 中断的详细介绍)
  5. https://blog.csdn.net/daydayup654/article/details/78630341 ( ld 工具的详细介绍)
  6. http://c.biancheng.net/view/661.html (gcc 的各种使用姿势)

操作系统实验二_加载用户程序

众所周知,引导扇区程序会将操作系统加载到内存中,并把计算机的控制权交给操作系统。问题来了,引导扇区程序是怎么加载用户程序的?

这是一个复杂的问题,为了回答这个问题,首先我们需要了解用户程序的内容有什么样的格式,其次我们需要知道处理器与硬盘交互的方式,最后还要知道引导扇区程序怎么把控制权交给用户程序。

用户程序header段

在引导扇区程序中,处理器从0x7c00处开始执行代码。然而,对于引导扇区程序会把用户程序加载到哪一个位置,用户程序是不知道的。那就有问题了,如果我在用户程序中要访问数据段中的一个数据,却连这个数据在内存中的位置都不知道,还怎么访问呢?

分段的方法能很好的解决这个问题。用户程序被分为几段,每一段有一个起始地址,如果我们要访问一个数据,只需给出它在段中的偏移地址即可。用户程序会告诉引导扇区程序自己有几个段,每个段相对于程序开始处的偏移是多少,引导扇区程序会给用户程序在内存中分配一个位置,比如说,0xd000,然后假设用户程序的数据段相对于程序开始处偏移为0x100,则数据段会被加载到内存中的0xd100处。然后,引导扇区程序把ds置为0xd10,这样,用户程序就可以用段地址和偏移地址访问内存了。

为了告诉引导扇区程序一些必要的信息,用户程序会有一个header段,里面包含了程序的长度、第一条指令的位置、其他段的个数和其他段的相对偏移量。如图所示:

image-20200427172648671.png

代码如下:

1
2
3
4
5
6
7
8
9
10
11
SECTION header vstart=0 align=16 ;SECTION表明这是一个段的开头,vstart=0表明这个段里的所有标号
;都用的是偏移地址,align=16表明这个段的起始位置要为16的倍数
length dd program_end;program_end标号在程序的尾部,可以用来得到程序的长度
codeentry dw start;start标号指向程序的第一条指令的位置
dd section.code_1.start;section.code_1.start指的是code_1这个段的开始位置,第一条指令在这个段里面
;根据上面两行,可以知道用户程序的第一条指令的位置
segment_table dw (header_end-code_1_segment)/4;段表的长度,即这个程序还有几个段
code_1_segment dd section.code_1.start;code_1段的开始位置
data_1_segment dd section.data_1.start;data_1段的开始位置
stack_segment dd section.stack.start;stack段的开始位置
header_end:

有了这个header段,引导扇区程序就能得到加载用户程序所必须的信息。由于引导扇区程序要将用户程序加载进内存,我们也成引导扇区程序为加载器

加载器

我们来看一下加载器会做些什么。

首先,我们默认加载器知道用户程序在磁盘中的哪一个扇区,也明确了用户程序会被加载到哪一个位置。在加载器的开头,会有一个:

1
first_sector equ 100

这是一个伪指令,相当于%define first_sector 100。

然后,加载器像其他程序一样,会有一个段声明:

1
SECTION mbr align=16 vstart=0x7c00

注意这里vstart=0x7c00。也就是说,后面的标号的值都要加上0x7c00。接下来是正常的设置堆栈段和栈指针的位置:

1
2
3
mov ax,0
mov ss,ax
mov sp,ax

然后要处理的是用户程序在内存中的起始位置:

1
2
3
4
5
6
mov ax,[cs:start_loc]
mov dx,[cs:start_loc+2]
mov bx,16
div bx
mov es,ax
mov ds,ax

一开始时cs寄存器是0,start_loc标号在程序的尾部,内容是:

1
start_loc: dd 0x10000

这个值就是用户程序会被加载到的位置。

由于0x10000是一个20位的数,于是只能用dx:ax两个寄存器来存储。我们将dx:ax除以16,商保存在ax中,将它赋值给es和ds。这样,ds和es就是用户程序的起始段地址。接下来我们读入第一个扇区:

1
2
3
4
xor bx,bx
xor di,di
mov si,first_sector
call read_disk

call指令意思是过程调用,它会先将ip的值保存至栈中,转而取执行read_disk位置的指令。事实上,它就相当于高级语言中的函数。向这个”函数”传递参数的方式是把要用到值放在其他寄存器中。

既然我们要读磁盘,我们得知道把要读哪一个扇区和读出来的内容放在哪里告诉磁盘。这里我们把扇区的位置放置在di:si中,读出来的内容放在[ds:bx]里 。做完这些工作,就可以调用read_disk了,它的内容是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
read_disk:
push ax
push bx
push cx
push dx

mov dx,0x1f2
mov al,1
out dx,al ;读取的扇区数

inc dx ;0x1f3
mov ax,si
out dx,al ;LBA地址7~0

inc dx ;0x1f4
mov al,ah
out dx,al ;LBA地址15~8

inc dx ;0x1f5
mov ax,di
out dx,al ;LBA地址23~16

inc dx ;0x1f6
mov al,0xe0 ;LBA28模式,主盘
or al,ah ;LBA地址27~24
out dx,al

inc dx ;0x1f7
mov al,0x20 ;读命令
out dx,al

disk_ok?:
in al,dx
and al,0x88
cmp al,0x08
jne disk_ok?

mov cx,256
mov dx,0x1f0

read_content:
in ax,dx
mov [bx],ax
add bx,2
dec cx
cmp cx,0
jne read_content

pop dx
pop cx
pop bx
pop ax

ret

看起来很复杂,但只要一条条分析,还是能分析清楚的。首先是把ax,bx,cx,dx 4个寄存器push到栈中。因为我们要修改它们的值,得先把它们保护起来,等这个过程要返回了,就把它们pop回去。

从第7行到第30行,是在向I/O端口读写信息,这些端口是独立编址的。从I/O端口读入信息用in指令,用法是:

1
in al,dx

或者

1
in ax,dx

dx是要访问的端口号,al、ax是用来保存读入的值。注意不能用其他的寄存器。

相应的,向I/O端口写入信息,要用out指令,用法是:

1
out dx,al

1
out dx,ax

dx是访问的端口号,al、ax是要写入的值。

in、out指令的dx参数也可以用立即数代替。

主硬盘端口分配的端口号是0x1f0-0x1f7。

其中,0x1f0 是硬盘接口的数据端口,而且还是一个 16 位端口。一旦硬盘控制器空闲,且准备就绪,就可以连续从这个端口写入或者读取数据。

0x1f1 端口是错误寄存器,包含硬盘驱动器最后一次执行命令后的状态(错误原因)。

0x1f2端口用于设置要读取的扇区数量,0x1f3-0x1f6端口用于设置起始扇区号。扇区号有28个字节,0-7字节要放在0x1f3里,8-15字节要放在0x1f4里,16-23字节要放在0x1f5里,24-27字节要放在0x1f6里。0x1f6的高4位置为1110。

端口 0x1f7 既是命令端口,又是状态端口。在通过这个端口发送 读写命令之后,硬盘就忙乎开了。在它内部操作期间,它将 0x1f7 端口的第 7 位置 “1”,表明自己很忙。一旦硬盘系统准备就绪,它再将此位清零,说明自己已经忙完了,同时将第 3 位置“1”,意思是准备好了,请求主机发送或者接收数据。

了解了这些后,第7-30行的代码也就很容易理解了。

第32-36行是在不断判断硬盘是否准备好,如果没有则继续循环。

第38行将cx置为256,因为一个扇区是512个字节,一次读出2个字节。第39行将dx置为0x1f0,即从0x1f0读入数据。

第41-47行不断将数据读入到[ds:bx]处,然后将bx+=2。

最后恢复ax,bx,cx,dx寄存器,注意顺序要反过来。

第一次读完磁盘后,用户程序的第一个扇区就已经在内存里了。接下来要进行一些很重要的工作:

1
2
3
4
5
6
7
mov dx,[bx+2]
mov ax,[bx]
mov bx,512
div bx
cmp dx,0
je tem1
inc ax

用户程序的前4个字节是用户程序的大小,先把它读入到ds:ax中,然后除以512,得到用户程序在磁盘中的扇区数。注意,如果用户程序的大小恰好为512的倍数,则除法指令结束后,ax的值就是用户程序占的扇区数,此时dx为0;否则,这个数字要加1。

接下来要读入其他扇区:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
tem1:
dec ax
mov cx,ax
cmp cx,0
je tem3

push ds
tem2:
xor bx,bx
inc si
mov ax,ds
add ax,0x20
mov ds,ax
call read_disk
dec cx
cmp cx,0
jne tem2

首先将ax减1(刚才已经读了一个扇区了)。如果ax==0,则跳过下面的工作;否则进入tem2这个循环。

循环里每次将si加1(ds:si是要读的扇区号),将ds+=0x20(写入内存的位置每次往后推512个字节),将bx置为0,然后调用read_disk过程读入扇区。

把扇区读完了,接下来要确定用户程序的第一条指令的位置:

1
2
3
4
tem3:
mov dx,[0x08]
mov ax,[0x06]
call segment_reloc

回忆一下,第一条指令的位置是通过 段地址(第一条指令所在段的首地址)+偏移地址(第一条指令相对于段地址的差值) 给出的。在[ds:bx+0x06]到[ds:bx+0x09]之间的是段地址。我们把它的值与用户程序在内存中的首地址相加,就得到了这个段的真实地址,再把它右移4位,送到ax寄存器中。上述过程会被执行很多次(有好几个段),所以我们把它写成一个segment_reloc过程:

1
2
3
4
5
6
7
8
9
10
push dx

add ax,[cs:start_loc]
add dx,[cs:start_loc+0x02]
shr ax,4
shl dx,12
or ax,dx

pop dx
ret

shr指令是右移4位,shl指令是左移12位,or指令把ax和dx合起来(ax的高4位为0,dx的低12位为0)。

得到真实段地址右移4位的值后,把它送回[ds:0x06]处。

1
mov [0x06],ax

接下来要读段表。首先从[ds:0x0a]处取出一个字放到cx中,表示段表的大小,然后将bx置为0x0c,这是段表的首地址相对用户程序起始处的偏移地址:

1
2
mov cx,[0x0a]
mov bx,0x0c

接下来要一个个读:

1
2
3
4
5
6
7
8
9
segment_table_reloc:
mov dx,[bx+0x02]
mov ax,[bx]
call segment_reloc
mov [bx],ax
add bx,4
dec cx
cmp cx,0
jne segment_table_reloc

这部分内容跟开始时读段表差不多,不再赘述。

折腾完这些,加载器的工作差不多完成了,可以把控制权交给用户程序了:

1
jmp far [0x04]

jmp far是 16 位间接绝对远转移。上面一条指令会从 ds:0x04 处取出6个字节,把前两个字节作为一个字,它是指令的偏移地址;后4个字节被当成一个双字,是指令所在段的地址(这个地址已经被我们重定位过了)。这样就能跳到用户程序的第一条指令了。注意,此时ds,es的值都是用户程序在内存中的首地址右移4位的值。

用户程序其他内容

看看用户程序会做什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
SECTION code_1 align=16 vstart=0
start:
mov ax,[stack_segment]
mov ss,ax
mov sp,stack_end

mov ax,[data_1_segment]
mov ds,ax

mov ax,0xb800
mov es,ax

mov bx,msg
mov cx,12

xor ax,ax
mov di,0

do:
mov al,[bx]
inc bx
mov [es:di],al
inc di
mov byte [es:di],0x07
inc di
dec cx
cmp cx,0
jne do

jmp near $

SECTION data_1 align=16 vstart=0
msg db 'hello world!'

SECTION stack align=16 vstart=0
resb 256
stack_end:
SECTION trail align=16
program_end:

第1行声明了一个段,第3行就是我们期待已经的第一条指令。前3条指令用于设置段指针和段寄存器的值。

第7,8行两行设置数据段寄存器ds的值,第9,10行两行设置附加段寄存器es的值。

从第12行开始是一个很正常的显示字符串的工作,不再赘述。

第32行声明了数据段,里面只有一个字符串。

第35行声明了堆栈段,里面用resb指令保留了256个字节。注意,stack_end标号是在resb指令的后面,因为栈指针的值是不断减小的。

最后,第38行声明了一个trail段,它没什么用,值得注意的是它没有vstart=0这个定义,所以标号program_end的值是从0开始算的,所以它的值就是用户程序的大小。

运行结果

用bochs运行,结果如下:

image-20200427174119405.png

效果拨群。

操作系统实验一_引导扇区程序

实验目的

​ 锻炼编写汇编语言程序的能力,增加对操作系统启动方式的了解,学习bochs调试工具、NASM编译工具的使用。

实验要求

​ 设计IBM_PC的一个引导扇区程序,程序功能是:用字符‘A’从屏幕左边某行位置45度角下斜射出,保持一个可观察的适当速度直线运动,碰到屏幕的边后产生反射,改变方向运动,如此类推,不断运动;在此基础上,增加你的个性扩展,如同时控制两个运动的轨迹,或炫酷动态变色,个性画面,如此等等,自由不限。还要在屏幕某个区域特别的方式显示你的学号姓名等个人信息。将这个程序的机器码放进放进第三张虚拟软盘的首扇区,并用此软盘引导你的XXXPC,直到成功。

实验环境

​ 使用NASM来编译代码,由于VSCode里可以使用终端,所以我用VSCode来编写汇编代码,然后可以方便的在终端里用NASM。
​ 用《x86汇编语言-从实模式到保护模式》这本书附带的fixvhdwr来将二进制文件写入至硬盘,然后使用bochs2.1.1来对程序进行调试,调试无误后把二进制文件放到VirtualBox运行。

实验过程

准备工作

​ 我计划按实验要求的方法在屏幕上循环显示”reeeeeeeeeein”,并且字符的颜色不相同,每显示一些字符之后就把屏幕刷新。如下图所示:

image.png

​ 首先我们要确定当前要显示的位置,这就需要知道是在哪一行哪一列。于是我在程序末尾分配两个变量:

1
2
locr: db 0 ;当前所在行
locc: db 0 ;当前所在列

​ 为了方便,以后直接用locr指代[ds:locr]这个位置存储的值,其他变量也是同理。

​ 知道了当前位置,还需要知道下一个位置。下一个位置无非是由当前位置往左往右往上往下得到,于是我又分配了两个变量;

1
2
u_or_d: db 0 ;上还是下,0是下,1是上
l_or_r: db 0 ;0右,1左

​ 要显示”reeeeeeeeeein”,得知道现在要显示的是第几个字符,还要把这个字符串存储起来:

1
words: db 'r','e','e','e','e','e','e','e','e','e','e','i','n'
1
loc: db 0 ;当前显示第几个字母
1
%define name_length 13;字符串的长度为13

​ 为了美观,背景颜色就用默认的黑色,而字符的前景颜色用颜色表里I=1的8个颜色:
image.png
​ 所以分配一个变量存储当前的颜色:

1
color: db 8;颜色从8显示到15

​ 在我的电脑上,bochs和VirtualBox虚拟机的屏幕宽度都是80个字符。于是我设置可显示的宽度为80,可显示的高度为17。

1
2
%define width 80
%define height 17

​ 要刷新屏幕,得用一个计数器,当它减到0就刷新一次:

1
%define flush_seq 300
1
flush_or_not: dw flush_seq;注意这里只能dw不能db,因为300>255

​ 此外,为了显示我的””知识产权”,我会屏幕下方显示”made by zjr”这个字符串。

代码解读

显示字符

​ 把上面这些工作做完了,就可以开始编写一些重要的代码了。首先看一下显示字符的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
do:
xor ax,ax
mov al,[ds:locr]
imul bx,ax,width
mov ax,bx
xor bx,bx
mov bl,[ds:locc]
add ax,bx
add ax,ax
mov di,ax;到这里,es:di就是我们要访问的位置
xor ax,ax
mov al,[ds:loc]
mov si,ax
mov al,[ds:si+words]
mov [es:di],al
mov al,[ds:color]
mov byte [es:di+1],al
inc al
cmp al,16
jne tem0
mov al,8
tem0:
mov [ds:color],al
inc byte [ds:loc]
cmp byte [ds:loc],name_length
jne tem1
mov byte [ds:loc],0

​ 当前显示的位置是:

$$
locr*width+locc
$$

​ 第二行把ax清0(这是有必要的,如果ah的值不为0可能得到错误的结果)。把locr的内容放到al里,然后第四行把ax和width相乘,结果放到bx。

​ 第五行到第十行是把bx放到ax,并把locc加到ax里,然后ax乘2。ax乘2的原因是显示屏上显示一个字符要2个字节。最后把ax放到di里。这样,es:di就是我们当前要显示的位置。

​ 从第11行到第15行是找到要显示的字符。首先取出loc,再用$ds:words+loc$得到要显示的字符的位置,最后把这个字符取出来,放到$es:di$里。

​ 第16行开始是在确定字符的前景颜色,这跟找到字符是差不多的。注意color要自增一次,自增完之后要判断它是否等于16,如果等于,就把color置为8。

​ 第24行开始是对loc的自增,然后判断是否是8,是的话就置为0。

下一个位置

​ 显示了当前的字符后,要确定下一个字符的位置,这就需要u_or_d和l_or_r这两个变量。u_or_d取值为0,则locr要加1,否则locr要减1;l_or_r取值为0,则locc加1,否则locc要减1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tem1:
cmp byte [ds:u_or_d],0
je tem2
dec byte [ds:locr]
jmp tem3
tem2:
inc byte [ds:locr]
tem3:

cmp byte [ds:l_or_r],0
je tem4
dec byte [ds:locc]
jmp tem5
tem4:
inc byte [ds:locc]

​ 先看1到7行:判断u_or_d是否为0,是则跳到tem2,把locr加1;否则把locr减1,然后跳到tem3,不执行tem2那里的操作。其实就是一个if-else语句。

​ 10到16行基本与之相同,不再赘述。

调整方向

​ 接下来,要判断方向要不要调整。如果locr到达height-1,则要把u_or_d改为1;如果locr到达0,则要把u_or_d改为0:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
tem5:
cmp byte [ds:locr],height-1
jne tem6
mov byte [ds:u_or_d],1

tem6:
cmp byte [ds:locr],0
jne tem7
mov byte [ds:u_or_d],0

tem7:
cmp byte [ds:locc],width-1
jne tem8
mov byte [ds:l_or_r],1

tem8:
cmp byte [ds:locc],0
jne tem9
mov byte [ds:l_or_r],0

​ 先判断locr是否等于height-1,不等于就跳到tem6位置,不执行第4行。如果等于就执行第4行,把u_or_d改为1;再判断它是否为0,是的话就执行第9行,把u_or_d改为0。值得注意的是,两个判断语句是相互独立的,所以第三行不能调到tem7,而是要跳到tem6。

​ 第11行往后跟前面基本是一样的。

其他工作

​ 每打印一个字符,会让程序暂停一段时间:

1
2
3
4
5
tem9:
mov ah,86h
mov cx,0x1E
mov dx,0x8480
int 15h

​ 考虑到如果屏幕字符很多会很影响观看体验,所以每打印300个字符会把屏幕刷新一遍:

1
2
3
4
5
6
7
cmp word [ds:flush_or_not],0
jne tem10
mov word [ds:flush_or_not],flush_seq
jmp flush
tem10:
dec word [ds:flush_or_not]
jmp do

​ 当flush_or_not等于0时,把它重新置为flush_seq,然后跳到flush段的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
flush:
mov ax,width
imul bx,ax,height
xor ax,ax
mov di,ax
flush_do:
add di,di
mov byte [es:di],0
mov byte [es:di+1],0
inc ax
mov di,ax
cmp ax,bx
jl flush_do

​ 首先用width和height相乘,得到要刷新的总字节数。然后执行循环,当ax<bx时继续,否则退出。循环里把di*2,然后将es:di和es:di+1置为0(注意一个字符占的位置是两个字节),最后ax+1,将di置为ax。

​ 把上面这段代码放在do标志前面,这样程序开始的时候就会把屏幕刷新。

​ 在程序开始的地方要设置段寄存器和附加段寄存器:

1
2
3
4
mov ax,0x7c0
mov ds,ax
mov ax,0xb800
mov es,ax

​ 在屏幕下方打印”made by zjr”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mov ax,width
imul bx,ax,height+2
add bx,bx
mov di,bx
xor cx,cx
mov si,cx
do2:
mov al,[ds:si+my_name]
mov [es:di],al
inc di
inc di
inc si
cmp si,name2_length
jne do2

​ 由于显示”reeeeeeeeeein”的位置是左上角(0,0)到右下角(height-1,width-1),可以让”maded by zjr”字符串在(height+2,0)开始显示,于是用bx存储$width(height+2)2$,把它复制给di,然后就可以用es:di直接访问要存储的位置。此后进入循环,用si做循环变量,每次用[ds:si+my_name]得到要打印的字符,赋值给al,再把它送到到打印的位置。

效果展示

​ 在VSCode里新建终端,输入

1
NASM proj.asm -o p

​ 得到一个二进制文件。然后使用fixvhdwr把它写到一个文件格式为vhd的虚拟硬盘:

image-20200423194702107.png

​ 然后选择这个二进制文件:

image-20200423194811099.png

​ 写入完成后,可以用bochs打开:

image-20200423195008833.png

​ 可以看到,结果符合预期。

​ 然后可以用VirtualBox验证一下:

​ 首先把刚才生成的二进制文件保存为img格式:

image-20200423195208552.png

​ 新建一个虚拟机:

image-20200423195406846.png

​ 点击右边的存储,往里面添加刚才创建的img文件作为软驱:

image-20200423195524820.png

在系统一栏设置启动顺序为软驱优先:

image-20200423195708890.png

​ 启动后就能看到结果:

image-20200423195854503.png

​ (似乎在virtualbox里显示的比bochs慢,并不知道为什么)

心得体会

​ 这次实验整体难度不算特别大,只对编写汇编代码有较基本的要求。我本来在2月份的时候就已经完成了,但只录了视频,没有写实验报告就交上去了,结果两个月过后,代码找不到了,实验报告也写不成了,没办法,只能重新写一遍,就算是练习吧。

​ 虽然写的代码并不多,但还是有不少奇奇怪怪的bug,比如:

image-20200423200349299.png

​ 这里loc变量长度是一个字节,而si是一个16位的寄存器。把loc赋值给si后,尽管我加了个byte,但第36行的[ds:si]还是会取两个字节的值。一开始loc后面是没有变量的,运行结果十分正常,后来我在后面加了一个image-20200423200602504.png

​ 取[ds:si]的时候就得到一个很大的值。这个bug困扰了我很久,直至我想起来si是个16位寄存器。

​ 类似于这样的bug还有很多,不再一一列举。以后我得多练习汇编语言编程,为以后其他的实验做准备。

​ 本次实验最大的收获还是增强了汇编语言编程能力,此外我还学会了用bochs进行debug,这对以后做实验是一个很大的帮助。还有的就是感谢TA推荐了Typora这个编写MarkDown文档的工具,确实比我之前用的VSCode插件好用很多,也算是一个意外之喜。

计算机网络实验2_Chat实验

实验要求

利用客户/服务器(Client/Sever或CS)模式实现一个多人聊天(群聊)程序。其功能是每个客户发送给服务器的消息都会传送给所有的客户端。

实验环境

采用linux编程,调用pthread库

实验内容

1.编写多人聊天程序,要求客户端和服务器都采用多线程方式进行编程。每个客户端都采用TCP协议连接服务器并保持连接。服务器同时与所有客户端建立和保持连接。每个客户端输入的消息都会通过服务器转发给所有客户。

客户端程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include<pthread.h>

#define BUFLEN 2000 // 缓冲区大小
/*------------------------------------------------------------------------
* main - TCP client for TIME service
*------------------------------------------------------------------------
*/

void* client_recv(void* in){
int my_sock=(long)in;
char* buf=new char[BUFSIZ+1];
while(1){
int c1=recv(my_sock,buf,BUFSIZ,0);
if(c1==0){
return NULL;
}
buf[c1]='\0';
printf("message received: %s\n",buf);
}
}

int main(int argc, char *argv[])
{
char *host = "127.0.0.1"; /* server IP to connect */
char *service = "50500"; /* server port to connect */
struct sockaddr_in sin; /* an Internet endpoint address */
char buf[BUFLEN+1]; /* buffer for one line of text */
int sock; /* socket descriptor */
int cc; /* recv character count */
pthread_t thread_handle;
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); //创建套接字,参数:因特网协议簇(family),流套接字,TCP协议
//返回:要监听套接字的描述符或INVALID_SOCKET
memset(&sin, 0, sizeof(sin)); // 从&sin开始的长度为sizeof(sin)的内存清0
sin.sin_family = AF_INET; // 因特网地址簇
sin.sin_addr.s_addr = inet_addr(host);
// printf("%d\n",sin.sin_addr.s_addr); // 设置服务器IP地址(32位)
sin.sin_port = htons((unsigned short)atoi(service)); // 设置服务器端口号
int ret=connect(sock, (struct sockaddr *)&sin, sizeof(sin)); // 连接到服务器,第二个参数指向存放服务器地址的结构,第三个参数为该结构的大小,返回值为0时表示无错误发生,
pthread_create(&thread_handle,NULL,client_recv,(void*)sock);
while(1){
printf("请输入要发送的消息:");
scanf("%s",buf);
if(strcmp(buf,"exit")==0){
printf("退出成功!\n");
break;
}
int c1=send(sock,buf,strlen(buf),0);
}

close(sock); // 关闭监听套接字

printf("按回车键继续...");
getchar(); // 等待任意按键
}

服务器端程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/* server2.c */
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include<pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#define max_socks 10000
#define buf_size 1000
int* socks[max_socks],now_loc;
void* serve(void* in){
int my_loc=(long)in;
int my_sock=*(socks[my_loc]);
char* pts=new char[buf_size+1];
while(1){
int c1 = recv(my_sock, pts, buf_size, 0);
if(c1==0){
close(my_sock);
socks[my_loc]=NULL;
break;
}
pts[c1]='\0';
printf("服务器收到消息: %s\n",pts);
for(int i=0;i<max_socks;i++){
if(socks[i]!=NULL){
send(*socks[i], pts, strlen(pts), 0);
}
}
}
return NULL;
}

int main(int argc, char *argv[])
/* argc: 命令行参数个数, 例如:C:\> TCPServer 8080
argc=2 argv[0]="TCPServer",argv[1]="8080" */
{
pthread_t* thread_handles;
thread_handles=(pthread_t*)malloc(max_socks*sizeof(pthread_t));
struct sockaddr_in fsin; /* the from address of a client */
int msock, ssock; /* master & slave sockets */
char *service = "50500";
struct sockaddr_in sin; /* an Internet endpoint address */
int alen; /* from-address length */
char pts[1000]; /* pointer to time string */
time_t now; /* current time */

msock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); // 创建套接字,参数:因特网协议簇(family),流套接字,TCP协议
// 返回:要监听套接字的描述符或INVALID_SOCKET

memset(&sin, '\0', sizeof(sin)); // 从&sin开始的长度为sizeof(sin)的内存清0
sin.sin_family = AF_INET; // 因特网地址簇(INET-Internet)
sin.sin_addr.s_addr = INADDR_ANY; // 监听所有(接口的)IP地址。
sin.sin_port = htons((unsigned short)atoi(service)); // 监听的端口号。atoi--把ascii转化为int,htons--主机序到网络序(host to network,s-short 16位)
bind(msock, (struct sockaddr *)&sin, sizeof(sin)); // 绑定监听的IP地址和端口号

listen(msock, 5); // 建立长度为5的连接请求队列,并把到来的连接请求加入队列等待处理。

while (1)
{ // 检测是否有按键,如果没有则进入循环体执行
alen = sizeof(struct sockaddr); // 取到地址结构的长度
ssock = accept(msock, (struct sockaddr *)&fsin, (socklen_t *)&alen); // 如果在连接请求队列中有连接请求,
socks[now_loc]=new int(ssock);
pthread_create(&thread_handles[now_loc],NULL,serve,(void*)now_loc);
now_loc++;
}
close(msock); // 关闭监听套接字
}

运行效果:
image.png

image.png

2.服务器程序转发某个客户端发来的消息时都在消息前面加上该客户端的IP地址和端口号以及服务器的当前时间。要求服务器程序把转发的消息也显示出来。

服务器程序(修改部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include<pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#define max_socks 10000
#define buf_size 1000
int* socks[max_socks],now_loc;
struct sockaddr_in fsin[max_socks]; /* the from address of a client */
void* serve(void* in){
int my_loc=(long)in;
int my_sock=*(socks[my_loc]);
char* pts=new char[buf_size+1];
while(1){
time_t t = time(NULL);
char ch[64] = {0};
char result[100] = {0};
unsigned int tem=fsin[my_loc].sin_addr.s_addr;
strftime(ch, sizeof(ch) - 1, "%Y-%m-%d--%H:%M:%S", localtime(&t));

int c1 = sprintf(pts, "\nip: %d.%d.%d.%d port: %u\ntime: %s\nmessage: ",
(tem<<24)>>24,(tem<<16)>>24,(tem<<8)>>24,tem>>24,fsin[my_loc].sin_port,ch);

int c2 = recv(my_sock, pts + c1, 1000-c1, 0);
pts[c1 + c2] = '\n';
pts[c1+c2+1]='\0';
if(c2==0){
close(my_sock);
socks[my_loc]=NULL;
break;
}
printf("%s\n",pts);
for(int i=0;i<max_socks;i++){
if(socks[i]!=NULL){
send(*socks[i], pts, strlen(pts), 0);
}
}
}
return NULL;
}

int main(int argc, char *argv[])
/* argc: 命令行参数个数, 例如:C:\> TCPServer 8080
argc=2 argv[0]="TCPServer",argv[1]="8080" */
{
pthread_t* thread_handles;
thread_handles=(pthread_t*)malloc(max_socks*sizeof(pthread_t));
int msock, ssock; /* master & slave sockets */
char *service = "50500";
struct sockaddr_in sin; /* an Internet endpoint address */
int alen; /* from-address length */
char pts[1000]; /* pointer to time string */
time_t now; /* current time */

msock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); // 创建套接字,参数:因特网协议簇(family),流套接字,TCP协议
// 返回:要监听套接字的描述符或INVALID_SOCKET

memset(&sin, '\0', sizeof(sin)); // 从&sin开始的长度为sizeof(sin)的内存清0
sin.sin_family = AF_INET; // 因特网地址簇(INET-Internet)
sin.sin_addr.s_addr = INADDR_ANY; // 监听所有(接口的)IP地址。
sin.sin_port = htons((unsigned short)atoi(service)); // 监听的端口号。atoi--把ascii转化为int,htons--主机序到网络序(host to network,s-short 16位)
bind(msock, (struct sockaddr *)&sin, sizeof(sin)); // 绑定监听的IP地址和端口号

listen(msock, 5); // 建立长度为5的连接请求队列,并把到来的连接请求加入队列等待处理。

while (1)
{ // 检测是否有按键,如果没有则进入循环体执行
alen = sizeof(struct sockaddr); // 取到地址结构的长度
ssock = accept(msock, (struct sockaddr *)&fsin[now_loc], (socklen_t *)&alen); // 如果在连接请求队列中有连接请求,
socks[now_loc]=new int(ssock);
pthread_create(&thread_handles[now_loc],NULL,serve,(void*)now_loc);
now_loc++;
}
close(msock); // 关闭监听套接字
}

运行截屏:
image.png

image.png

3.新客户刚连接时服务器端把“enter”消息(包含客户端IP地址和端口号)发送给所有客户端。
服务器程序(修改部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//只修改了serve函数
void* serve(void* in){
int my_loc=(long)in;
int my_sock=*(socks[my_loc]);
char pts[buf_size+1];
char tem[buf_size+1]="Enter!\n";
while(1){
time_t t = time(NULL);
char ch[64] = {0};
char result[100] = {0};
unsigned int port=fsin[my_loc].sin_addr.s_addr;
strftime(ch, sizeof(ch) - 1, "%Y-%m-%d--%H:%M:%S", localtime(&t));

int c1 = sprintf(pts, "\nip: %d.%d.%d.%d port: %u\ntime: %s\nmessage: %s",
(port<<24)>>24,(port<<16)>>24,(port<<8)>>24,port>>24,fsin[my_loc].sin_port,ch,tem);
printf("%s\n",pts);
for(int i=0;i<max_socks;i++){
if(socks[i]!=NULL){
send(*socks[i], pts, strlen(pts), 0);
}
}
int c2 = recv(my_sock,tem,buf_size, 0);
tem[c2] = '\0';
if(c2==0){
close(my_sock);
socks[my_loc]=NULL;
break;
}
}
return NULL;
}

运行截屏:
第一个客户端启动:
image.png
第二个客户端启动:
image.png

4.客户端输入exit时退出客户端程序(正常退出),或者客户端直接关闭窗口退出(异常退出),服务器都会把该客户leave的消息广播给所有客户。
服务器程序(修改部分):
只修改了serve函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void* serve(void* in){
int my_loc=(long)in;
int my_sock=*(socks[my_loc]);
char pts[buf_size+1];
char tem[buf_size+1]="Enter!\n";
while(1){
time_t t = time(NULL);
char ch[64] = {0};
char result[100] = {0};
unsigned int port=fsin[my_loc].sin_addr.s_addr;
strftime(ch, sizeof(ch) - 1, "%Y-%m-%d--%H:%M:%S", localtime(&t));

int c1 = sprintf(pts, "\nip: %d.%d.%d.%d port: %u\ntime: %s\nmessage: %s",
(port<<24)>>24,(port<<16)>>24,(port<<8)>>24,port>>24,fsin[my_loc].sin_port,ch,tem);
printf("%s\n",pts);
for(int i=0;i<max_socks;i++){
if(socks[i]!=NULL){
send(*socks[i], pts, strlen(pts), 0);
}
}
if(strcmp(tem,"Leave!\n")==0){
close(my_sock);
socks[my_loc]=NULL;
break;
}
int c2 = recv(my_sock,tem,buf_size, 0);
tem[c2] = '\0';
if(c2==0){
sprintf(tem,"Leave!\n");
}
}
return NULL;
}

运行截屏:
客户端用输入exit离开时:
image.png
客户端用Ctrl+C退出时:
image.png

5.运行客户端程序测试与老师的服务器程序的连接(103.26.79.35:50500)。
运行截屏(客户端):

image.png

实验体会

还是挺有趣的…

MPI学习笔记2

集合通信

MPI_Send和MPI_Recv可以实现不同进程之间的通信,但如果要跟很多进程发消息,或者要接受很多进程发送的消息,这样就太过麻烦。为此,MPI提供了一系列称为集合通信的函数,它涉及通信子之间的所有进程的通信。

MPI_Reduce

MPI_Reduce用于实现高效的全局运算,如求和,求最大值等。原型如下:

1
2
int MPI_Reduce(void* input_data_p,void* output_data_p,int count,
MPI_Datatype datatype,MPI_Op operator,int dest_process,MPI_Comm comm);

对于每个进程,我们调用MPI_Reduce,然后对目的进程,output_data_p指向的值会与每个进程的input_data_p指向的值进行运算。如果count参数的值大于1,则运算会在数组上进行。
支持的运算有:
|运算符值|含义|
|-|-|
|MPI_MAX|求最大值|
|MPI_MIN|求最小值|
|MPI_SUM|求累加和|
|MPI_PROD|求累乘值|
|MPI_LAND|逻辑与|
|MPI_BAND|按位与|
|MPI_LOR|逻辑或|
|MPI_BOR|按位或|
|MPI_LOR|逻辑异或|
|MPI_BOR|按位异或|
|MPI_MAXLOC|求最大值和最大值所在的位置|
|MPI_MINLOC|求最小值和最小值所在的位置|

一个示例程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
int val=my_rank*my_rank,sum=0;
MPI_Reduce(&val,&sum,1,MPI_INT,MPI_SUM,0,MPI_COMM_WORLD);
if(my_rank==0){cout<<sum<<endl;}
MPI_Finalize();
}

有两点值得注意:
1.第一个参数和第二个参数的指针不能相同,否则会得到非法的结果。
2.对于不是目标进程的进程,第二个参数实际上是没有作用的,可以置为NULL。

MPI_Allreduce

有时候,我们不仅需要全局运算,还需要把结果放到每个进程里,这时候可以调用MPI_Allreduce函数:

1
2
int MPI_Allreduce(void* input_data_p,void* output_data_p,int count,
MPI_Datatype datatype,MPI_OP operator,MPI_Comm comm);

参数的意义基本与MPI_Reduce相同,不再赘述。
用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
int val=my_rank*my_rank,sum=0;
MPI_Allreduce(&val,&sum,1,MPI_INT,MPI_SUM,MPI_COMM_WORLD);
cout<<sum<<endl;
MPI_Finalize();
}

MPI_Bcast

有时候,我们需要将一个进程里的数据发送到通信子中的所有进程,这是可以调用MPI_Bcast函数:

1
int MPI_Bcast(void* data_p,int count,MPI_Datatype datatype,int souce_proc,MPI_Comm comm);

其中,data_p在发送进程里为输入参数,在其他进程里为输出参数,用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);

if(my_rank==0){
char* s="hello world!\n";
MPI_Bcast(s,strlen(s)+1,MPI_CHAR,0,MPI_COMM_WORLD);
}
else{
char *s=(char*)malloc(100*sizeof(char));
MPI_Bcast(s,14,MPI_CHAR,0,MPI_COMM_WORLD);//注意,这里s还是空字符串,不能用strlen确定长度。
printf("In process %d,s is %s",my_rank,s);
free(s);
}
MPI_Finalize();
}

MPI_Scatter

假如在主进程里有一个长为100的数组,要把它分到10个进程里,其中0号进程分到前10个元素,1号进程分到第10个到第19个元素……这时可以调用MPI_Scatter函数来进行分发:

1
2
3
int MPI_Scatter(void* send_buf_p,int send_count,
MPI_Datatype send_type,void* recv_buf_p,int recv_count,
MPI_Datatype recv_type,int src_proc,MPI_Comm comm);

假如数组大小为n,通信子里共有comm_sz个进程,则MPI_Scatter会把数组分为comm_xz份,每份有$local_n=\frac{n}{comm_xz}$个。此时send_count和recv_count应被置为local_n,因为send_count表示的是发送到每个进程的数据量,recv_count表示每个进程接收到的数据量。

一个示例程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
int n=100;
int* b=(int*)malloc(n/comm_sz*sizeof(int));
if(my_rank==0){
int* a=(int*)malloc(n*sizeof(int));
for(int i=0;i<n;i++){
a[i]=i;
}
MPI_Scatter(a,n/comm_sz,MPI_INT,b,n/comm_sz,MPI_INT,0,MPI_COMM_WORLD);
}
else{
MPI_Scatter(NULL,n/comm_sz,MPI_INT,b,n/comm_sz,MPI_INT,0,MPI_COMM_WORLD);
}
for(int i=0;i<n/comm_sz;i++)printf("In process %d, b[%d]=%d\n",my_rank,i,b[i]);
MPI_Finalize();
}

注意,跟MPI_Reduce一样,MPI_Scatter的两个指针参数不能相同,而且非目的进程的send_buff_p可以是NULL。

MPI_Gather

有了把数据发出去的方法,自然会有把数据收集起来的方法,它是MPI_Gather:

1
2
int MPI_Gather(void* send_buf_p,int send_count,MPI_Datatype send_type,void* recv_buf_p,
int recv_count,MPI_Datatype recv_type,int dest_proc,MPI_Comm comm);

用法和注意事项 与MPI_Scatter差不多,不再赘述

MPI_Allgather

MPI_Allgather与 MPI_Gather的关系有点类似MPI_allreduce与MPI_Reduce的关系

1
2
int MPI_Gather(void* send_buf_p,int send_count,MPI_Datatype send_type,void* recv_buf_p,
int recv_count,MPI_Datatype recv_type,int dest_proc,MPI_Comm comm);

用于将数据串联起来,存储到每个进程的recv_buf_p参数中。

综合应用

用上述集合通信的方法实现矩阵向量乘法的并行化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include<mpi.h>
#include<bits/stdc++.h>
#include<sys/time.h>
using namespace std;

void my_rand(double* vec,int size){
for(int i=0;i<size;i++)vec[i]=(double)rand()/RAND_MAX;
}

void Mat_vec_mul2(double* local_A,double* x,double* local_y,int local_m,int n,int local_n,MPI_Comm comm){
int local_i,j,local_ok=1;
for(local_i=0;local_i<local_m;local_i++){
local_y[local_i]=0.0;
for(j=0;j<n;j++)local_y[local_i]+=local_A[local_i*n+j]*x[j];
}
}
int main(int argc,char** argv){
int comm_sz,my_rank;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);

int m,n,local_m,local_n;
if(argc!=3)abort();
sscanf(argv[1],"%d",&m);
sscanf(argv[2],"%d",&n);
local_m=m/comm_sz,local_n=n/comm_sz;
double* local_A,* local_x,* local_y;
local_A=(double*)malloc(local_m*n*sizeof(double));
local_x=(double*)malloc(local_n*sizeof(double));
local_y=(double*)malloc(local_m*sizeof(double));
double *x=(double*)malloc(n*sizeof(double));
if(my_rank==0){
struct timeval tv,tv2;
gettimeofday(&tv,NULL);

double* A=(double*)malloc(m*n*sizeof(double));
double* y=(double*)malloc(m*sizeof(double)),*y2=(double*)malloc(m*sizeof(double));
my_rand(A,m*n),my_rand(x,n);
MPI_Scatter(A,local_m*n,MPI_DOUBLE,local_A,local_m*n,MPI_DOUBLE,0,MPI_COMM_WORLD);
MPI_Bcast(x,n,MPI_DOUBLE,0,MPI_COMM_WORLD);
Mat_vec_mul2(local_A,x,local_y,local_m,n,local_n,MPI_COMM_WORLD);
MPI_Gather(local_y,local_m,MPI_DOUBLE,y,local_m,MPI_DOUBLE,0,MPI_COMM_WORLD);

gettimeofday(&tv2,NULL);
printf("time: %d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);
// for(int i=0;i<m;i++)cout<<y[i]<<" "<<y2[i]<<endl;
free(A),free(y),free(y2);
}
else{
MPI_Scatter(NULL,local_m*n,MPI_DOUBLE,local_A,local_m*n,MPI_DOUBLE,0,MPI_COMM_WORLD);
MPI_Bcast(x,n,MPI_DOUBLE,0,MPI_COMM_WORLD);
Mat_vec_mul2(local_A,x,local_y,local_m,n,local_n,MPI_COMM_WORLD);
MPI_Gather(local_y,local_m,MPI_DOUBLE,NULL,local_m,MPI_DOUBLE,0,MPI_COMM_WORLD);
}
free(local_A);free(local_y);free(x);
}

虽然这个程序使用了多进程的方式进行了优化,但事实上运行时间远远大于串行的版本:

并行:
image.png

串行:
image.png

MPI派生数据类型

显然,进程之间通信是非常非常慢的。如果主进程要往其他进程广播一个整数,两个浮点数,有没有方法能用一次通信的方法解决呢?
MPI为了解决这个问题,提供了用户自定义的派生数据类型。
我们可以用MPI_Type_create_struct函数创建由不同基本数据类型的元素所组成的派生数据类型:

1
2
int MPI_Type_create_struct(int count,int array_of_clocklengths[],
MPI_Aint array_of_displacements[],MPI_Datatype array_of_types[],MPI_Datatype* new_type_p);

第一个参数表明我们创建的数据类型有多少个元素;第二个参数表示每个元素是单个的数据还是一个数组;第三个参数表示每个元素相对于第一个元素的内存地址偏移量;第四个参数表示每个元素的类型;第5个参数表示创建的类型。

比如我们要创建一种数据类型,包含一个int,两个double,可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
int a;
double b,c;
MPI_Aint a_addr,b_addr,c_addr;
MPI_Get_address(&a,&a_addr);
MPI_Get_address(&b,&b_addr);
MPI_Get_address(&c,&c_addr);
MPI_Aint array_of_displacements[3]={0,b_addr-a_addr,c_addr-a_addr};
int array_of_blocklengths[3]={1,1,1};
MPI_Datatype array_of_types[3]={MPI_INT,MPI_DOUBLE,MPI_DOUBLE};
MPI_Datatype input_mpi_t;
MPI_Type_create_struct(3,array_of_blocklengths,array_of_displacements,array_of_types,&input_mpi_t);
MPI_Type_commit(&input_mpi_t);

MPI_Get_address()用于获得第一个指针参数的绝对地址,把这个地址赋值给第二个参数指向的地址空间。
再调用完MPI_Type_create_struct()函数之后,还需要调用MPI_Type_commit()函数向通信子提交这一数据类型。之后,就可以像使用普通数据类型一样使用input_mpi_t了:

1
2
3
4
5
6
7
8
  if(my_rank==0){
a=1,b=1.1,c=2.2;
MPI_Bcast(&a,1,input_mpi_t,0,MPI_COMM_WORLD);
}
else{
MPI_Bcast(&a,1,input_mpi_t,0,MPI_COMM_WORLD);
printf("In process %d: a=%d b=%f c=%f\n",my_rank,a,b,c);
}

Tensor与Autograd详解

参考自《深度学习框架PyTorch入门与实践》

Tensor

创建Tensor

创建Tensor的方法有很多:

函数 功能
Tensor(*size) 基础构造函数
ones(*size) 全为 1 Tensor
zeros(*size) 全为 0 Tensor
eyes(*size) 对角线为1,其他为0
arange(s,e,step) 从s到e,步长为step
linspace(s,e,step) 从s到e,均分为step份
rand/randn(*sizes) 均匀/标准分布
normal(mean,std)/uniform(from,to) 正态分布/均匀分布
randperm(m) 随机排列

用Tensor()函数创建tensor时,可以有很多方法:

1
2
3
4
5
6
7
8
9
x=t.Tensor([[1,2,3],[4,5,6]])#用一个list来创建
y=t.Tensor([1,2,3])
z=t.Tensor(1,2,3)#用几个值来确定Tensor的维度
print(x)
print(y)
print(y.tolist())
print(z)
a=x
a

image.png

1
2
3
4
5
6
print(x.size())
print(x.numel())
y=t.Tensor(x.size())
z=t.Tensor((2,3))#Tensor的元素是2和3
a=t.Tensor(2,3)#Tensor的大小是2*3
y,z,a

image.png

tensor.shape和tensor.size()是一样的:
image.png

一些其他的创建方法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
x=t.ones(2,3)
print(x)
x=t.zeros(4,5)
print(x)
x=t.arange(1,7,2)#注意,第二个参数是取不到的
print(x)
x=t.arange(1,7.00000000000001,2)#注意,第二个参数是取不到的
print(x)
x=t.linspace(1,10.7,5)
print(x)
x=t.randn(3,3)#均值为0,方差为1的正态分布
print(x)
x=t.randperm(20)
print(x)
x=t.eye(3,5)
print(x)

image.png

改变形状的操作

用 view(),squeeze(),unsqueeze(),resize_()等方法可以改变tensor的形状

view()

view()用于直接改变形状,但不能改变tensor的大小。用法:

1
2
3
b=t.arange(0,6)
print(b.view(2,3))
print(b.view(-1,2,1,3)) # -1表示自动计算其大小

image.png

unsqueeze()

unsqueeze()用于增加tensor的维度。用法:

1
2
3
4
print(b)
print(b.unsqueeze(1))#从 6 变成 6*1
print(b.unsqueeze(-2))#-2 的意思是,增加的那个维度是增加了维度之后的tensor的倒数第二个维度。
#也就是说,从 6 增加成 1*6

image.png

squeeze()

squeeze()用于压缩大小为一的维度。用法:

1
2
3
4
x=t.Tensor(2,3,1)
print(x.squeeze(0))#第0个维度大小是2,不能压缩
print(x.squeeze())#压缩所有大小为1的维度
x.squeeze(2)#第二个维度大小是1,被压缩

image.png

resize_

resize_也可以用来修改形状。不同的是,如果新尺寸超过了原来的尺寸,会自动分配新的内存空间。用法:

1
2
3
4
x=t.rand(2,3)
print(x)
print(x.resize_(3,2))
x.resize_(3,3)

image.png

索引操作

Tensor可以按下标索引:

1
2
3
4
5
6
7
8
9
a=t.randn(3,4)
print(a)
print(a[0])#第0行
print(a[:,2])#第2列
print(a[1:3,2:4])#第1,2行和第2,3列的交叉
print(a[:2])#前两行
print(a[0:2])#前两行
print(a[0:1,:2])#第0行,前两列。形状是1*2
print(a[0,:2])#第0行,前两列。形状是2

image.png

1
2
3
4
print(a)
print(a>1)
print(a[a>1])#形状不同
print(a*(a>1))#形状相同

image.png

选择函数

选择函数用于从Tensor中选出一部分。

index_select()

index_select()用于选出tensor特定维度特定下表的部分。用法:

1
2
3
4
5
6
7
8
import torch
a = torch.linspace(1, 12, steps=12).view(3, 4)
print(a)
b = torch.index_select(a, 0, torch.tensor([0, 2]))#0表示第0个维度,0,2表示这一维度下表为0和2的会被选出来。
print(b)
print(a.index_select(0, torch.tensor([0, 2])))
c = a.index_select(1, torch.tensor([1, 3]))#1表示第一个维度
print(c)

image.png

mask_select()

a.mask_select(b)=a[b],没啥好用的

nonzero()

用于获得非0元素的下标,用法:

1
2
3
a=t.Tensor([2,3,0,0,1])
print(a.nonzero())
a[a.nonzero()]

gather()和scatter()

太复杂了,等用到的时候再搞。

高级索引

x[[],[]……[]] 其中第1个列表表示第0个维度要选的东西,第2个列表表示第1个维度要选的东西,以此类推。

1
2
3
4
x=t.arange(0,27).view(3,3,3)
print(x)
print(x[[1,2],[1,2],[2,1]])#相当于x[1,1,2]和x[2,2,1]
print(x[[1,2],[0],[0]])#相当于x[1,0,0]和x[2,0,0]

image.png

Tensor类型

tensor的元素默认类型是FloatTensor,可以通过t.set_default_tensor_type()修改默认类型。

常见类型有:

数据类型 CPU tensor GPU tensor
32bit浮点 torch.FloatTensor torch.cuda.FloatTensor
64bit浮点 torch.DoubleTensor torch.cuda.DoubleTensor
16bit浮点 16bit浮点只能在GPU里用 torch.cuda.HaltTensor
8bit无符号整型 torch.ByteTensor torch.cuda.ByteTensor
8bit有符号整型 torch.CharTensor torch.cuda.CharTensor
16bit有符号整型 torch.ShortTensor torch.cuda.ShortTensor
32bit有符号整型 torch.IntTensor torch.cuda.IntTensor
64bit有符号整型 torch.LongTensor torch.cuda.LongTensor

还可以用type()函数指定类型:

1
2
3
4
5
6
7
t.set_default_tensor_type("torch.FloatTensor")
a=t.rand(2,3)
print(a)
b=a.int()
print(b)
b=b.type("torch.DoubleTensor")
print(b)

逐元素操作

这些函数会对tensor的每一个元素进行操作,如下表所示:

函数 功能
abs/sqrt/div/exp/fmod/log/pow

pytorch 快速入门

参考自《深度学习框架PyTorch入门与实践》

Tensor

Tensor是pytorch里最基础的数据结构,可以认为是一个高级数组。它可以包含一个数,一个一维数组,一个二维矩阵或更高维的数组。

Tensor的创建

创建一个Tensor的方法:

1
2
3
import torch as t
x=t.Tensor(5,3)
x

这种情况下,会生成一个内容全为0的Tensor:
image.png

随机生成Tensor的方法:

1
2
x=t.rand(2,3)
x

这样,Tensor里的每一个值都是一个0到1之间的随机数:
image.png

用t.ones()能生成全为1的Tensor,用t.zeros()能生成全为0的Tensor:

1
2
3
4
x=t.ones(2,3)
y=t.zeros(5,2)
print(x)
print(y)

image.png

得到Tensor的大小:

1
2
print(x.size())
x.size(0),x.size(1)

image.png

Tensor运算

Tensor之间的运算可以用 ‘+’、’-‘、’*’、’/‘等符号实现,也可以用add,sub等函数实现。

1
2
3
4
5
6
7
x=t.rand(2,3)
y=t.rand(2,3)
print(x)
print(y)
print(x+y)
print(x.add(y))
print(x)

image.png

值得注意的是,add函数并不会修改x的值,要实现x+=y,要用add_()函数:

1
2
3
4
5
6
7
x=t.rand(2,3)
y=t.rand(2,3)
print(x)
print(y)
print(x+y)
print(x.add_(y))
print(x)

image.png

Tensor与numpy的关系

Tensor的很多操作跟numpy差不多,它们也可以互相转换:

1
2
3
4
5
6
import numpy as np
x=np.ones(3)
y=t.from_numpy(x)
print(y)
z=y.numpy()
print(z)

image.png

值得注意的是,Tensor与nump对象的内存是共享的,因此其中一个改变的时候,另一个也会改变:
image.png

将Tensor转移到cuda上

可以通过.cuda方法把Tensor转移到GPU上:
image.png

Autograd

pytorch的Autograd模块提供了反向传播的功能。

autograd.Variable是Autograd中的核心类,它可以理解为Tensor的升级版。它的backward()方法能够自动计算梯度,提供了反向传播的功能。
image.png
image.png

由于y对x的每一个元素的偏导数都是1,所以x.grad是一个2*2的全为1的矩阵。
如果我们在调用一次y.backward(),会怎么样?
image.png

我们发现grad在反向传播的过程中是累加的,所以我们要将grad清零:
image.png

神经网络

torch.nn是专门为神经网络设计的模块化接口,可以用来定义和运行神经网络。nn.Module是nn中最重要的类,可以看成是神经网络的封装。

定义网络

一个网络需要继承nn.Module,并实现它的forward方法,并把具有可学习参数的层放在构造函数init中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
def __init__(self):
super(Net,self).__init__()
self.conv1=nn.Conv2d(1,6,5)#第一个参数是输入通道数,
#第二个参数是输出通道数,第三个参数是卷积核的大小。
self.conv2=nn.Conv2d(6,16,5)
self.fc1=nn.Linear(16*5*5,120)
self.fc2=nn.Linear(120,84)
self.fc3=nn.Linear(84,10)

def forward(self,x):
x=F.max_pool2d(F.relu(self.conv1(x)),(2,2))
x=F.max_pool2d(F.relu(self.conv1(x)),2)# 这里的2跟(2,2)的效果是一样的
x=x.view(x.size()[0],-1)
x=F.relu(self.fc1(x))
x=F.relu(self.fc2(x))
x=self.fc3(x)
return x

net=Net()
print(net)

image.png

只要实现了forward方法,backward方法就会被自动实现。
网络的可学习参数可以通过net.parameters()得到:
image.png

image.png

forward函数的输入和输出都是Variable,因此在输入的时候要把Tensor变成Variable:
image.png

注意,input是一个4维的向量,其中第一维是batch的大小,神经网络的输入是一个batch,里面可以有很多张图像。第二维是图像的通道数。第三维第四维是图像的大小。

损失函数

nn里定义了很多损失函数,如MSELoss用来计算均方误差,CrossEntropyLoss用来计算交叉熵损失。
image.png

优化器

torch.optim 中实现了很多优化方法,可以用它们来对网络的参数进行更新:

1
2
3
4
5
6
7
import torch.optim as optim
optimizer=optim.SGD(net.parameters(),lr=0.01)
optimizer.zero_grad()
output=net(input)
loss=criterion(output,target)
loss.backward()
optimizer.step()

CIFAR-10 分类

CIFAR-10是一个常用的彩色图片数据集,共有10个类别,每张图片的大小都是33232。
我们利用torchvision提供的CIFAR-10数据集来训练一个神经网络,并让它对测试图片进行预测。

首先进行一些准备工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch as t
import torchvision as tv
import torchvision.transforms as transforms
from torchvision.transforms import ToPILImage
show=ToPILImage()
#show用于将Tensor对象转化为图片,以便于观察结果
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))])
#transform 用于对输入图像进行预处理
trainset=tv.datasets.CIFAR10(root='/home/cy/data/',train=True,download=True,transform=transform)
trainloader=t.utils.data.DataLoader(trainset,batch_size=4,shuffle=True,num_workers=2)
#用trainloader加载训练数据,batch_size=4说明一个batch是4张图片,num_worker=2说明用两个线程
testset=tv.datasets.CIFAR10('/home/cy/data/',train=False,download=True,transform=transform)
testloader=t.utils.data.DataLoader(testset,batch_size=4,shuffle=False,num_workers=2)
classes=('plane','car','bird','cat','deer','dog','frog','horse','ship','truck')

查看某一张照片:

1
2
3
(data,label)=trainset[100]
print(classes[label])
show((data+1)/2).resize((100,100))

image.png

定义一个网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
def __init__(self):
super(Net,self).__init__()
self.conv1=nn.Conv2d(3,6,5)#第一个参数是输入通道数,
#第二个参数是输出通道数,第三个参数是卷积核的大小。
self.conv2=nn.Conv2d(6,16,5)
self.fc1=nn.Linear(16*5*5,120)
self.fc2=nn.Linear(120,84)
self.fc3=nn.Linear(84,10)

def forward(self,x):
x=F.max_pool2d(F.relu(self.conv1(x)),(2,2))
x=F.max_pool2d(F.relu(self.conv2(x)),2)# 这里的2跟(2,2)的效果是一样的
x=x.view(x.size()[0],-1)
x=F.relu(self.fc1(x))
x=F.relu(self.fc2(x))
x=self.fc3(x)
return x

net=Net()
print(net)

定义一个损失函数和优化器

1
2
3
from torch import optim
criterion = nn.CrossEntropyLoss()
optimizer=optim.SGD(net.parameters(),lr=0.001,momentum=0.9)

开始训练:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from torch.autograd import Variable
for epoch in range(2):
runnint_loss=0.0
for i,data in enumerate(trainloader,0):
inputs,labels=data
inputs,labels=Variable(inputs),Variable(labels)
#注意要将输入数据转化成Variable类型,不然无法训练
optimizer.zero_grad()
outputs=net(inputs)
loss=criterion(outputs,labels)
loss.backward()
optimizer.step()
runnint_loss+=loss.item()
if i%2000==1999:
print("[%d,%5d] loss: %3f"%(epoch+1,i+1,runnint_loss/2000))
runnint_loss=0.0
print("Finish trainning")

查看第一个batch的数据和标签:

1
2
3
4
dataiter=iter(testloader)
images,labels=dataiter.next()
print("实际的label:"," ".join("%08s"%classes[labels[j]] for j in range(4)))
show(tv.utils.make_grid(images/2-0.5)).resize((400,100))

image.png

查看它们的预测结果:

1
2
3
outputs=net(Variable(images))
_,predicted=t.max(outputs.data,1)
print("预测结果:","".join("%8s"%classes[predicted[j]] for j in range(4)))

查看总正确率:

1
2
3
4
5
6
7
8
9
10
11
correct=0
total=0
for data in testloader:
images,labels=data
outputs=net(Variable(images))
_,predicted=t.max(outputs.data,1)
#output.data是一个数组,max返回两个值,第一个是最大值,第二个是最大值所在的位置
total+=labels.size(0)
correct+=(predicted==labels).sum()
#根据最大值所在的位置是否跟labels相等可以判断分类是否正确
print("正确率为:%d %%"%(100*correct/total))

Pthread学习笔记2

临界区问题

当多个线程同时修改同一个内存地址时,可能会出现奇奇怪怪的问题。我们根据下面的公式编写一个计算π的程序:image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/time.h>
int thread_count,n;
double sum;

void* Thread_sum(void* rank){
long my_rank=(long)rank;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
sum+=factor/(2*i+1);
}
return NULL;
}

int main(int argc,char** argv){
if(argc!=3)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
printf("ans: %f\n",sum);
n=strtol(argv[2],NULL,10);

struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
//printf("Hello from the main thread\n");
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
// for(int i=0;i<m;i++)printf("%f\n",y[i]);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);

return 0;
}

执行代码,当线程数大于1的时候会发现,程序输出的结果是不确定的,而且误差也较1个线程的情况大,甚至运行时间也是较长:
image.png

这种情况被称为临界区问题,当0号线程把sum取到寄存器并执行运算的时候,1号进程也可能同时把sum取到寄存器,然后0号进程把结果写回内存,随后1号进程也把结果写回内存。这样就导致0号进程的计算结果被覆盖掉了,我们称

1
sum+=factor/(2*i+1);

这样更新共享资源的语句为一个临界区。当多个线程尝试更新一个共享资源时,结果可能是无法预测的。更一般地,当多个线程都要访问共享变量或共享文件这样的共享资源时,如果有一个线程执行了更新操作,就可能产生错误,我们称之为竞争条件

临界区问题可以用忙等待、互斥量和信号量等方法来解决。

忙等待

一种解决方法是设置一个标记变量,用于指明临界区可以被哪个线程执行,该执行完之后修改这个标记变量,而不能执行临界区的线程必须一直处于忙等待状态。
我们可以将代码修改成这样:

1
2
3
while(flag!=my_rank);
sum+=factor/(2*i+1);
flag=(flag+1)%thread_count;

flag是我们说到的标记变量,当flag==当前线程号时,可以执行临界区,否则必须处于忙等待状态。执行完之后要修改flag。

运行结果如下:
image.png
可以看到,结果是正确了,但运行速度满了很多,甚至比单线程的情况还慢。

当然,由于临界区只能被串行执行,我们应该尽量减少临界区执行的次数,比如,我们可以把Thread_sum函数修改成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
// printf("%d %d\n",flag,my_rank);
my_sum+=factor/(2*i+1);
}
while(flag!=my_rank);
sum+=my_sum;
flag=(flag+1)%thread_count;
return NULL;
}

用一个局部变量来代替sum变量,这样循环的过程中就不会有临界区。
image.png
运行结果正确,速度上也有提升。

使用忙等待还会有一个问题:

考虑一个最多可以同时执行2个线程的计算机,我们使用忙等待的方法编写一个5个线程的pthreads程序。假设开始时,操作系统运行0号线程和1号线程,这是没有问题的,此时2,3,4三个线程被挂起。然后假设0号线程运行结束,操作系统将其挂起,然后调度3号线程。由于操作系统的线程调度是随机的,它完全不知道此时只有2号线程能进入临界区,所以3号线程只能一直处于忙等待状态,浪费了时间。

所以,当线程数量大于计算机能同时运行的线程数时,使用忙等待的方法可能会产生问题。

互斥量

由于处于忙等待的线程会持续地占用CPU资源,所以忙等待并不是一个特别理想的解决临界区问题的方案。我们可以使用互斥量和信号量。

互斥量是互斥锁的简称,是一个特殊类型的变量,它可以保证每次只有一个线程能进入临界区。

互斥量的变量类型是pthread_mutex_t,必须调用 pthread_mutex_init()函数进行初始化。

当线程执行到临界区前面时,应该调用pthread_mutex_lock()函数。如果此时临界区已经被加了锁(即已经有其他线程执行到了这里),则它会被卡在临界区外等待;否则它会进入临界区,并把临界区加锁。

当临界区被执行完,线程要用pthread_mutex_unlock函数把临界区解锁。此时,系统会从其他等待的线程选出一个进入临界区。如果有多个等待的线程,操作系统选择哪个线程是随机的的。

互斥量使用完之后要用pthread_mutex_destroy函数将其销毁。

互斥量的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/time.h>
int thread_count,n;
double sum;
int flag;
pthread_mutex_t mutex;
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
// printf("%d %d\n",flag,my_rank);
my_sum+=factor/(2*i+1);
}
pthread_mutex_lock(&mutex);
sum+=my_sum;
pthread_mutex_unlock(&mutex);
return NULL;
}

int main(int argc,char** argv){
if(argc!=3)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
n=strtol(argv[2],NULL,10);
pthread_mutex_init(&mutex,NULL);
struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
//printf("Hello from the main thread\n");
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
// for(int i=0;i<m;i++)printf("%f\n",y[i]);
free(thread_handles);
pthread_mutex_destroy(&mutex);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);

return 0;
}

运行结果:

image.png

信号量

信号量(semaphore)可以认为是一种特殊类型的unsigned int,它的类型是sem_t。信号量可以赋值为0,1,……。大多数情况下,我们只用0和1两个值。这种只有0和1值的信号量也称为二元信号量。0对应上了锁的互斥量,1对应没上锁的互斥量。

初始化信号量的方法是调用sem_init(),它的原型是:

1
int sem_init(sem_t* a,int b,int c);

其中第二个参数直接置为0就可以了,第三个参数是初始值。

如果我们要用信号量来解决临界区问题,可以先创建一个全局信号量,初始值为1。在临界区前调用sem_wait()函数,这个函数的意义是:如果信号量为0则阻塞;如果是非0值则减1然后进入临界区。临界区执行完之后,调用sem_post()函数将信号量置为1。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<semaphore.h>//注意要加这个头文件
#include<sys/time.h>
int thread_count,n;
double sum;
int flag;
sem_t my_semaphore;
void* Thread_sum(void* rank){
long my_rank=(long)rank;
double my_sum=0;
double factor;
long long i;
long long my_n=n/thread_count;
long long my_first_i=my_n*my_rank;
long long my_last_i=my_first_i+my_n;

if(my_first_i%2==0){
factor=1.0;
}
else factor=-1.0;
for(i = my_first_i;i<my_last_i;i++,factor=-factor){
my_sum+=factor/(2*i+1);
}
sem_wait(&my_semaphore);
sum+=my_sum;
printf("Thread %d\n",my_rank);
sem_post(&my_semaphore);
return NULL;
}

int main(int argc,char** argv){
if(argc!=3)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
n=strtol(argv[2],NULL,10);
sem_init(&my_semaphore,0,1);

struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
free(thread_handles);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);
return 0;
}

使用信号量还可以控制线程执行的顺序,如发送消息问题。我们可以给每个线程分配一个信号量,初始化为0,给每个线程一个char*,指向收到的消息。每一个线程把消息放到下一个线程后将下一个线程的信号量置为1,然后等待上一个线程的消息。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<semaphore.h>
#include<sys/time.h>
#define MSG_MAX 100
int thread_count,n;
double sum;
int flag;
pthread_mutex_t mutex;
char** messages;
sem_t* semaphores;
void* Thread_sum(void* rank){
long my_rank=(long)rank;
long dest=(my_rank+1)%thread_count;
char* my_msg=malloc(MSG_MAX*sizeof(char));
sprintf(my_msg,"Hello to %ld from %ld",dest,my_rank);
messages[dest]=my_msg;
sem_post(&semaphores[dest]);
sem_wait(&semaphores[my_rank]);
printf("Thread %ld > %s\n",my_rank,messages[my_rank]);
}

int main(int argc,char** argv){
if(argc!=2)abort();
long thread;
pthread_t* thread_handles;
thread_count=strtol(argv[1],NULL,10);
semaphores=(sem_t*)malloc(thread_count*sizeof(sem_t));
messages=(char**)malloc(thread_count*sizeof(char*));
struct timeval tv,tv2;
gettimeofday(&tv,NULL);
thread_handles=malloc(thread_count*sizeof(pthread_t));
for(thread=0;thread<thread_count;thread++)pthread_create(&thread_handles[thread],NULL,Thread_sum,(void*)thread);
//printf("Hello from the main thread\n");
for(thread=0;thread<thread_count;thread++)pthread_join(thread_handles[thread],NULL);
// for(int i=0;i<m;i++)printf("%f\n",y[i]);
free(thread_handles);
pthread_mutex_destroy(&mutex);
gettimeofday(&tv2,NULL);
printf("ans: %.8f\n",4*sum);
printf("%d\n",tv2.tv_sec*1000000 + tv2.tv_usec - tv.tv_sec*1000000 - tv.tv_usec);

return 0;
}

结果如下所示:
image.png

但如果我们把sem_wait那一行去掉,会出现这样的变化:
image.png
第0号线程没有收到消息的时候就调用printf函数,自然就得不到想要的结果。

像上面这种一个线程需要等待另一个线程执行某种操作的同步方式,有时称为生产者-消费者同步模型