1. TCP网络优化的两种机制

TCP(Transmission Control Protocol 传输控制协议)是一种面向连接的、可靠的、全双工、基于字节流的传输层通信协议,由IETF的RFC 793定义。

TCP协议是网络编程中最重要的协议之一,TCP协议将上层的数据附上TCP报头等信息,封装成一个个报文段(segment),然后交由下层网络层去处理。TCP协议定义了TCP报文段的结构,如下图所示:

这里写图片描述

可以看出,TCP每个报文段的首部大小至少是20字节的数据,因此若用户数据为1字节,再加上网络层IP包头20字节,则整个IP数据包的大小为41字节,那么整个IP数据包的负荷率为1 / 41。这显然是不划算的,会降低网络的传输效率,当网络都充斥着这种IP数据包的时候,可想而知整个网络几乎都在传输一些无用的包头信息,这种问题被称为小包问题。特别是在Telnet协议中,当用户远程登录到一个主机,他的每一次键盘敲击实际上都会产生一个携带用户数据量小的数据包,这是典型的小包问题

为了解决这种问题,出现了**Nagle's Algorithms**,这个算法是 John Nagle为解决实际过程中出现的小包问题而发明的。它的思想很朴素,就是将多个即将发送的小段的用户数据,缓存并合并成一个大段数据时,一次性一并发送出去。特别的是,只要当发送者还没有收到前一次发送TCP报文段的的ACK(即连接中还存在未回执ACK的TCP报文段)时,发送方就应该一直缓存数据直到数据达到可以发送的大小,然后再统一合并到一起发送出去,如果收到上一次发送的TCP报文段的ACK则立马将缓存的数据发送出去。

以下是Nigle算法的伪代码:

1
2
3
4
5
6
7
8
9
10
11
if there is new data to send
if the window size >= MSS and available data is >= MSS
send complete MSS segment now
else
if there is unconfirmed data still in the pipe
enqueue data in the buffer until an acknowledge is received
else
send data immediately
end if
end if
end if

MSS = maximum segment size

与之相呼应的还有一个网络优化的机制叫做TCP延迟确认,这个是针对接收方来讲的机制,由于ACK包属于有效数据比较少的小包,因此延迟确认机制就会导致接收方将多个收到数据包的ACK打包成一个回复包返回给发送方。这样就可以避免导致只包含ACK的TCP报文段过多导致网络额外的开销(前面提到的小包问题)。延迟确认机制有一个超时机制,就是当收到每一个TCP报文段后,如果该TCP报文段的ACK超过一定时间还未发送就启动超时机制,立刻将该ACK发送出去。因此延迟确认机制会可能会带来500ms的ACK延迟确认时间。

延迟确认机制Nigle算法几乎是在同一时期提出来的,但是是由不同的组提出的。这两种机制在某种程度上的确对网络传输进行了优化,在通常的协议栈实现中,这两种机制是默认开启的。

但是,这两种机制结合起来的时候会产生一些负面的影响,可能会导致应用程序的性能下降。

2. write-write-read模式带来的问题

考虑这么一种情况,

1.假设发送方A启用了Nigle算法,接收方B启用了延迟确认机制,则当发送方A向TCP连接进行了两次write操作,每次write操作都之写入了少量的数据(少于MSS),假设写入的数据片段为w1,w2,然后紧接着调用了阻塞式的read操作。

2.因为连接中没有未收到ACK的TCP报文段,发送方A的第一次写入的w1会立马发送出去;

3.则在接收方来B来看,它会首先收到包含w1数据段的TCP报文段,但是由于延迟确认机制,接收方B会延迟发送该TCP报文段的ACK直到超时。

4.而对于发送方A来说,根据Nigle算法,由于连接中上一个TCP报文段未收到ACK,并且第二次写入的w2数据段过于小(小于MSS),则发送方A会将w2入队列缓存起来,不会立即发送。

5.然而不幸的是,发送方A此时并不继续发送数据,因此依靠发送方A将缓存填满来把w2数据段发送出是不可能的了,现在只能傻傻的等待接收方B因为超时而返回w1的ACK了。

6.这样当接收方B因为超时而返回ACK后,发送方A就会立即发送包含w2数据段的TCP报文段。

自此,w1、w2数据段才被完整的发送给接收方B,如果忽略传输时间等其他因素,这额外增加的数据传输延迟就是接收方B启用的延迟确认机制中的超时的值。
因此当延迟确认机制Nigle算法都启用的时候,正好碰上了这种write-write-read模式的数据传输,就会出现这种问题,因此wiki上就有这些建议:

The user-level solution is to avoid write-write-read sequences on sockets. write-read-write-read is fine. write-write-write is fine. But write-write-read is a killer. So, if you can, buffer up your little writes to TCP and send them all at once. Using the standard UNIX I/O package and flushing write before each read usually works.

Nigle算法在一次性写入比较大的数据段时会出现延迟的现象,特别是对于Request-Response模式的程序来讲,通常一个请求的数据会大于MMS,这样一个请求就会跨越多个TCP报文段,因此Nigle算法会导致最后一个TCP报文段被Hold住,出现延时;同样的一个回复的数据也会大于MMS,因此也会出现这种延时。

Nigle算法通常是用来防止那些写得不太好的程序,防止这些程序随意的发小包降低网络传输效率;而对于一些精心编写的程序,Nigle算法几乎没什么用,应用程序编写者应该合理的把握、判断好每次写入的数据的大小,进而采取适当的策略进行发送,要么将小包合并到MMS大小,然后一次性写入并发送;要么禁用Nigle算法

当然以上只是一种解决方案,通常的协议栈会预留接口来禁用Nigle算法,即设置TCP_NODELAY选项。

3. TCP NO_DELAY选项

Socket编程模型会有一个预留接口来禁用Nigle算法,这个接口就是TCP NODELAY
这是网上的用Java写的一个关于是否启用TCP NODELAY实验

需要说明的是这个实验在同一台windows上是不成功的,在windows上的loopback地址的处理可能和Linux不太一样。在windows下实验,开启TCP NODELAY与否都不会出现延迟的情况。奇怪的是我选一台windows做Client,一台Max机器做Server,实验仍然失败,不知道Windows对于Nagle算法是否是否是默认的开启。但是我选windows机器做Server,Mac机器做Client,实验结果显而易见。

Server端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public class Server {

public static void main(String[] args) {
// TODO Auto-generated method stub
try {
ServerSocket socket = new ServerSocket(9989);
System.out.println("绑定端口");
while(true) {
Socket con = socket.accept();
new Thread( new Runnable() {

@Override
public void run() {
// TODO Auto-generated method stub
try {
InputStream in = con.getInputStream();
OutputStream out = con.getOutputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while (true) {
String line = reader.readLine();
if(!con.isClosed())
out.write((line + "\r\n").getBytes());
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40


public class Client {

public static void main(String[] args) {
// TODO Auto-generated method stub
try {
Socket s = new Socket("192.168.31.235", 9989);
s.setTcpNoDelay(true);
OutputStream ops = s.getOutputStream();
InputStream in = s.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));

// write-write-read
String head = "h";
String body = "w\r\n";

int i = 10;

while ( i-- > 0) {
long label = System.currentTimeMillis();
ops.write(head.getBytes());
ops.write(body.getBytes());// 会有延迟吧应该,有可能不会阻塞
InputStream ips = s.getInputStream();
String line = reader.readLine();
System.out.println("RTT:" + (System.currentTimeMillis() - label) + ", receive: " + line);
}
s.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

其中Server代码跑在Windows上,Client代码跑在Mac上

  • 未开启TCP NODELAY,write-write-read模式

这里写图片描述

  • 开启TCP NODELAY,write-write-read模式

这里写图片描述

  • 未开启TCP NODELAY, write-read模式

这里写图片描述

  • 开启TCP NODELAY, write-read模式

这里写图片描述

可以明显的看出,当采用write-write-read模式,且未开启TCP NODELAY选项时(未禁用Nagle算法)时,延迟产生了,大约200ms

4. TCP CORK选项

TCP CORK

TCP_CORK是另一种关于TCP的选项,他与TCP——NODELAY的详细区别此处不做介绍,相关资料见:

https://linux.die.net/man/7/tcp

http://baus.net/on-tcp_cork

http://stackoverflow.com/questions/22124098/is-there-any-significant-difference-between-tcp-cork-and-tcp-nodelay-in-this-use

位运算

几乎每种编程语言都为我们提供一种运算,它直接操作二进制数据,这种运算叫做位运算

位运算分为移位、取反、与、或、异或、非,其中移位又包括左移位、右移位、左无符号移位、右无符号移位。

含义 Java写法
a & b
a | b
异或 a ^ b
非(取反) ~a
左移 a<<b
右移 a>>b
无符号左移 a <<< b
无符号右移 a>>>b

可能大多数时候觉得,这些位运算除了平时刷题用到,好像并没有什么用。当时当我们去看一些源代码的时候,比如JDK、Android SDK等源代码的时候,我们会发现有很多地方都会看见一些位运算的影子。这些使用位运算的例子都会有某个命令为mask的变量,这个mask就是用来存储某几种状态的信息。

那么什么是位掩码呢?根据维基百科的定义:

In computer science, a mask is data that is used for bitwise operations, particularly in a bit field.

位掩码是一种用来方便进行位运算的数据,可以帮助我们在读取或者修改某个特定的位上的值,而不会修改其他位的值。

位运算的用例一—权限控制

假设有这么一个场景,需要对某个文件设置权限,假设有三种权限需要设置:读、写、执行。那么通常的做法可能就是采用三个布尔值来存储当前的权限。
那么写法通常可能是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public class Permission {
private boolean is_allowed_read = false;
private boolean is_allowed_write = false;
private boolean is_allowed_execution = false;

public Permission() {

}

public Permission(boolean is_allowed_read, boolean is_allowed_write, boolean is_allowed_execution) {
this.is_allowed_read = is_allowed_read;
this.is_allowed_write = is_allowed_write;
this.is_allowed_execution = is_allowed_execution;
}

public void setIsAllowedRead(boolean is_allowed_read) {
this.is_allowed_read = is_allowed_read;
}

public boolean getIsAllowedRead() {
return this.is_allowed_read;
}

public void setIsAllowedWrite(boolean is_allowed_write) {
this.is_allowed_write = is_allowed_write;
}

public boolean getIsAllowedWrite() {
return this.is_allowed_write;
}

public void setIsAllowedExecution(boolean is_allowed_execution) {
this.is_allowed_execution = is_allowed_execution;
}

public boolean getIsAllowedExecution() {
return this.is_allowed_execution;
}
}

上面这种写法可能是比较常见的,比较符合我们的思维习惯,但是使用位运算中掩码的概念来改写这个例子,会使得更加简洁、高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

public class Permission {
private final static byte Allowe_Read = 1 << 0;//00000001
private final static byte Allowe_Write = 1 << 1;//00000010
private final static byte Allow_Execution = 1 << 2;//00000100

private byte permissionMask = 0x00;//默认没有任何权限

public Permission() {

}

public Permission(byte permission) {
this.permissionMask = permission;
}

//增加一项或者多项权限
public void enable(byte permission) {
this.permissionMask |= permission;
}

//禁用一项或多项的权限
public void disable(byte permission) {
this.permissionMask &= ~permission;
}

//查询一项或多项权限是否被启用
public boolean isAllowed(byte permission) {
return this.permissionMask & permission == permission;
}


//查询一项或多项权限是否被禁用
public boolean isDisAllowed(byte permission) {
return this.permissionMask & permission == 0;
}
}

这种写法明显表达的信息量要多于第一种写法,举个例子:现在要同时启用三种权限,那么第一种写法就是:

1
2
3
4
5
6
7

setIsAllowedRead(true);

setIsAllowedWrite(true);

setIsAllowedExecution(true);

而第二种写法就是:

1
2
3

enable(Permission.ALLOW_READ | Permission.ALLOW_WRITE | Permission.ALLOW_EXECUTION);

这种写法对于使用Permission类的时候来说,方便许多。在Linux系统中设置权限时通常会用到

1
chmod 777 file

其中777就是1111111 | 1111111 | 1111111,可见Linux里面也是采用位运算中的掩码来设置文件权限的。

用位运算的方式来实现这个权限控制的优点是:高效,位运算比较接近与机器的运算方式;简洁,无论试编写还是使用都比较方便简洁。
缺点:代码不够直观,可读性差,当维护这段代码的时候可能比较恼火, 不如第一种写法一目了然。

通常如果需要维护n个开关变量(二值变量)的时候,只需要n位二进制的整数和数个mask即可,完成状态的保存和查询。这种写法在Android SDK里面是非常常见的。
可以加以推广,如果需要保存n个具有m种状态的变量,那么需要一个nm进制的数即可完成。

In programming, an atomic action is one that effectively happens all at once. An atomic action cannot stop in the middle: it either happens completely, or it doesn’t happen at all. No side effects of an atomic action are visible until the action is complete.

以上是关于原子性的操作的相关描述。

在Java中,以下的操作可以认为是原子操作

  1. 对于引用变量、大多数的原始类型变量的读、写(所有的类型除了longdouble)都是原子性的(这个只有才32位的JVM成立)
  2. 所有申明为volatile的变量(包括longdouble变量)的读、写都是原子性的

在Java中,自增、自减操作都不是原子性操作,很容易理解。但是longduoble的读写却不是原子性的问题,却不太好理解。

检验

在32位JVM上,通过两个线程对同一个成员变量进行读写,来测试longdouble类型变量的读写不是原子性操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

public class UnatomicLongDemo implements Runnable {

private static long test = 0;

private final long val;

public UnatomicLongDemo(long val) {
this.val = val;
}

@Override
public void run() {
while (!Thread.interrupted()) {
test = val;//两个线程同时断写test变量,如果test变量的读写操作是原子性的,那么test只能是-1或者0
}
}

public static void main(String[] args) {
Thread t1 = new Thread(new UnatomicLongDemo(-1));
Thread t2 = new Thread(new UnatomicLongDemo(0));

System.out.println(Long.toBinaryString(-1));
System.out.println(pad(Long.toBinaryString(0), 64));

t1.start();
t2.start();

long switchVal;
while ((switchVal = test) == -1 || switchVal == 0) {
//如果test、switchVal的操作是原子性的,那么就应该是死循环,否则就会跳出该循环
System.out.println("testing...");
}

System.out.println(pad(Long.toBinaryString(switchVal), 64));
System.out.println(switchVal);

t1.interrupt();
t2.interrupt();
}

//将0补齐到固定长度
private static String pad(String s, int targetLength) {
int n = targetLength - s.length();
for (int x = 0; x < n; x++) {
s = "0" + s;
}
return s;
}

}

测试结果为

这里写图片描述

通过这个例子可以看出

  1. 在32位JVM上,对long型变量test的读取或者对long型变量switchVal的写入不是原子性的。

  2. 非原子性的读、写只是造成longdouble类型变量的高、低32位数据不一致

这是由于在32位JVM中对64位的数据的读、写分两步,每一步读或者写32位的数据,这样就会造成两个线程对同一个变量的读写出现一个线程写高32位、另一个线程写入低32位数据。这样此变量的数据就出现不一致的情况。这时候volatile关键字可以防止这种现象发生,因为java的内存模型保证了valotile修饰的longdouble变量的读写是原子性的。

单例模式是一种常用的软件设计模式。在它的核心结构中只包含一个被称为单例的特殊类。通过单例模式可以保证系统中一个类只有一个实例。

单例通常有很多种写法,但是性能和效果却是差距挺大。下面列举了几种常见的写法。

一、单例模式常见的写法

  1. 懒汉式(线程不安全)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    
public class Singleton {
private static Singleton instance = null;

private Singlton() {
}

public static Singleton getInstance() {
if(instance == null)
return instance;
return instance;
}
}

这种写法,在多线程的时候,由于并发访问instance,会导致创建多个instance,从而使得单例模式失效。

  1. 懒汉式(线程安全1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    
public class Singleton {
private static Singleton instance = null;

private Singlton() {
}

public synchronized static Singleton getInstance() {
if(instance == null)
return instance;
return instance;
}
}

这种写法的优点是保证了线程安全,缺点是效率低下,因为instance一旦创建,大部分时间都是多线程在访问instance,因此把同步加在方法上会导致多个线程等待。

  1. 懒汉式(线程安全2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    
public class Singleton {
private static Singleton instance = null;

private Singlton() {
}

public static Singleton getInstance() {
if(instance == null) {
synchronized(Singleton.class) {
instance = new Singleton();
return instance;
}
}
return instance;
}
}

这种写法有效的避免了第2种写法的缺点,当instance被创建成功后,大部分时间多线程访问instance的时候都无需同步;而只有当需要创建instance的时候才需要同步创建instance的代码块。看似这种写法比较完美,但是这种写法有有一个致命的缺点就是,当线程A判断instance等于null的时候,这时线程A被挂起,线程B判断instance为null,同时获取到锁进入了同步代码块,然后成功的创建了instance,最后释放了锁退出了同步代码块。恰好此时线程A成功的获取到锁进入同步代码块继续执行,它也会创建一个instance,这样instance就被创建不止一次,系统中就存在多个instance。

  1. 双重检验式(Double Check Lock)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    
public class Singleton {
private static Singleton instance = null;

private Singleton() {
}

public static Singleton getInstance() {
if(instance == null) {
synchronized(Singleton.class) {
if(instance == null) {
instance = new Singleton();
return instance;
}
}
}
return instance;
}
}

另一种类似的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Singleton {
private volatile Singleton instance = null;

private Singleton() {
}

public static Singleton getInstance() {
if(instance == null) {
synchronized(Singleton.class) {
if(instance == null) {
instance = new Singleton();
return instance;
}
}
}
return instance;
}
}

双重校验可以避免第3种方法的缺点,当线程获得锁后进入同步代码块后,再进一步确认instance是否为null。通常双重校验锁这种形式会比较好的达到正常的单例的效果。

  1. 饿汉式
1
2
3
4
5
6
7
8
9
10
public class Singleton {
private static Singleton instance = new Singleton();

private Singleton() {
}

public static Singleton getInstance() {
return instance;
}
}

这种方式利用ClassLoader的机制保证了单例类只在被类加载器第一次加载的时候,创建一个instance,避免了多线程的同步问题。优点是没有同步代码块,效率高;缺点是Singleton可能会因为多种原因被加载,因此没有实现懒加载。

  1. 饿汉式(静态内部类)
1
2
3
4
5
6
7
8
9
10
11
12
public class Singleton {
private static class SingletonHolder {
private static final Singleton INSTANCE = new Singleton();
}

private Singleton() {
}

public static Singleton getInstance() {
return SingletonHolder.INSTANCE;
}
}

这种方式相对于第5种方式,虽然由于某种原因导致Singleton类被类加载器加载,但是由于SingletonHolder没有被显示的加载,因此instance还是没有被创建。因此这种形式在某种程度上来说,相对于第5种写法来说延迟了instance的创建。

当单例类由不同的类加载器加载或者能被序列化和反序列化或者可以通过反射来调用私有构造函数的时候,上述的几种方式都不能很好的实现单例的效果,这个需要再进一步讨论。

二、枚举类型实现单例模式

JDK1.5以后,可以通过枚举类型来实现线程安全的、防止序列化和反序列化的、代码简洁的单例模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public enum Singleton {
INSTANCE;
//属性
private int a,b,c;
//构造函数只能为私有的
private Singleton() {

}

public void method1() {

}

public void method2() {

}
}

采用这种方式实现的单例模式是十分推荐的,天然线程安全,天然解决了序列化和反序列化之后、反射调用私有构造函数出现多个对象的问题。同时这也是Effective Java推荐的写法。

三 、References

https://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html

Collections类简介

Collections类是java集合框架的一个类,其主要是一些通用的作用于Collection的 算法,如排序,求极值,混淆(shuffle)等。
引用Java官方文档的介绍

The polymorphic algorithms described here are pieces of reusable functionality provided by the Java platform. All of them come from the Collections class, and all take the form of static methods whose first argument is the collection on which the operation is to be performed. The great majority of the algorithms provided by the Java platform operate on List instances, but a few of them operate on arbitrary Collection instances.

Collections类的方法都是静态方法,每一种方法都对应一种集合算法的实现,且每一种实现都有两种,一种是适用于实现了RandomAccess接口的集合类(例如ArrayList),另一种是适用于序列存储的,例如(LinkedList)。

Collections类包含的算法实现大致如下:

  • 排序
    排序采用归并排序,所以排序算法是稳定的,时间复杂度是确定的。
  • 混淆
  • 常规的集合数据操作(适用于List
    包括reversefillcopyswapaddAll
  • 搜索(适用于List
    binarySearch
  • 极值
    求集合的最大元素、最小元素

Collections中的大多数算法都只是适用于List,接下来讨论的Rotate方法就是只适用于List的。
使用与List的算法主要有:

  • sort
  • shuffle
  • reverse
  • rotate
  • swap
  • replaceAll
  • fill
  • copy
  • binarySearch
  • indexOfSubList
  • lastIndexofSubList

Rotate方法使用

Rotate方法需要一个参数distance,该方法将一个List旋转多少长度为distance。假如有个序列列list[a,b,c,d],调用方法Collections.rotate(list, 1)后,得到的list就变为了[d,a,b,c]
调用此方法后,位置i上的元素将变为位置(i - distance) mod list.size()的元素,0 <= i < list.size()distance可以为正数、0、负数。正数代表向前(下标值变大的方向)旋转,负数代表向后旋转。调用方法Collections.rotate(list, -1)后,得到的list就变为了[b,c,d,a]

这个方法常常和ListsubList方法结合使用,用于将一个list的某个或多个元素进行移动,而不破坏其余元素的顺序。例如为了将某个list的位置j的元素向前移动到k的位置。(设移动的距离为dd >= 0),k = j + d + 1)。

1
Collections.rotate(list.subList(j, k+1), -1);

举个栗子,例如[a,b,c,d,e],将b元素向前移动三个位置

1
Collections.rotate(list.subList(1, 5), -1);

调用后得到list[a,c,d,e,b]

Rotate方法源码分析

Rotate方法的方法原型为

1
public static void rotate(List<?> list, int distance) 

Rotate方法有两种实现,一种适用于实现了RandomAccess接口的List(类似数组的随机访问性质)或者size小于阈值的序列存储的List,另一种是size大于阈值的序列存储(通常是指链表的形式存储的)的List

关于该方法的源码为

1
2
3
4
5
6
public static void rotate(List<?> list, int distance) {
if (list instanceof RandomAccess || list.size() < ROTATE_THRESHOLD)
rotate1(list, distance);
else
rotate2(list, distance);
}

其中rotate1对应于第一种的实现,rotate2对应于第二种的实现。

对于rotate1的实现具体的做法是,将序列的第一个元素交换至正确的位置i,此时原来在位置i的元素就是一个错误放置的元素displaced,然后计算该元素正确放置的位置i,并将其放置到位置i,此时又得到一个错误放置的元素displaced,重复此过程直到错误放置的元素displaced被放置到了第一个位置。(即i == 0)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

private static <T> void rotate1(List<T> list, int distance) {
int size = list.size();
if (size == 0)
return;
distance = distance % size;
if (distance < 0)
distance += size;//将distance化为到范围为0到size - 1的区间
if (distance == 0)
return;

for (int cycleStart = 0, nMoved = 0; nMoved != size; cycleStart++) {
T displaced = list.get(cycleStart);
int i = cycleStart;
//从cycleStart开始,迭代式的将错误放置的元素放置到正确的位置,直到某个元素被放置到了cycleStart
//此时,应该结束do...while循环,因为如果继续循环得到的结果就是啥也没干,因为上一轮的循环已经
//将此次循环访问到的位置的元素放正确了,此时应该换下一个位置作为起点cycleStart,继续执行直到
//被正确放置的元素的个数nMoved达到了size个
do {
i += distance;
//此处i的范围是 0 ~ 2*size
//当i大于size且小于2 * size的时候,此时i= i mod size 等价于 i = i - size
if (i >= size)
i -= size;
//此时i的范围为0 ~ size - 1
displaced = list.set(i, displaced);
nMoved ++;//记录已经被正确放置的元素的个数
} while (i != cycleStart);//当i回到起点后,退出本次循环
}
}

对于rotate2的实现与上面的实现完全不一样,因为考虑到rotate2适用于的是类似于采用链表存储的List,因此不具有随机访问的特性。因此该实现采用的是将原list-distance mod size处使用subList方法拆分为两个子列表s1s2,并分别反转俩链表revese(s1)reverse(s2),最后再将整个链表反转,reverse(list)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  private static void rotate2(List<?> list, int distance) {
int size = list.size();
if (size == 0)
return;
int mid = -distance % size;
if (mid < 0)
mid += size;//确保mid位于0 ~ size的区间
if (mid == 0)
return;
//分别反转两个子链表
reverse(list.subList(0, mid));
reverse(list.subList(mid, size));
//最后反转整个链表 两次反转确保俩子链表的元素的的相对位置不变
reverse(list);
}

示意图

示意图中(1) (2) (3)分别对应三次链表反转。由示意图中大致可以看出,前两次反转子链表的目的是为了将原链表的首末连在一起,并将mid指向的的元素和其逆时针方向的旁边的元素断开。最后整个链表的反转是为了恢复俩子链表的元素的原始相对顺序。

链表的反转通常采用头插法反转。

小结

  • rotate方法是将链表旋转一定的distance,该方法常常与subList方法结合用户在List中移动某个元素到指定的位置,而不影响其他元素的顺序
  • rotate有两种实现,一种针对于随机存取的,另一种针对链表式存取的。
  • 两种实现的思想也不一样,一种是通过迭代式的交换,另一种是通过链表的反转实现的。

分治

分治(Divide and Conquer)是一场常见的算法策略。分治策略的基本思想就是对于一个问题规模为N的问题,将其划分为规模足够小的K个子问题,子问题由于规模足够小可以直接求解,最后将规模足够小的K的问题的解合并得出原问题的解。
分治策略的问题的求解过程的一般套路就是:

  1. 判断问题规模,足够小进入步骤2,否者进入步骤3
  2. 直接对问题进行求解,返回问题的解。
  3. 将问题拆分成K个小规模的问题,进入步骤2。并将解合并,得到原问题的解。

由于分治策略的步骤描述自带递归属性,因此分治策略的算法实现常采用递归来实现。递归的算法通常会含有三部分,第一部分是个基准情况(Base Case),第二部分是问题拆分,并调用自身,第三部分是合并并返回结果。

分治策略中主要有两个关键过程:

  • 将原问题拆分Division
  • 将各子问题合并Combination

分治策略的主要时间开销发生在这两个过程,通常分治策略有两类

  1. 拆分容易,合并困难
  2. 拆分困难,合并简单

下文中的合并排序快速排序分别属于上面的第1和第2类。

合并排序和快速排序

在常见排序算法中,合并排序和快速排序是典型的分治算法,其时间复杂度的平均性能均为nlgn。合并排序的最坏时间复杂度也能保持nlgn,而快速排序的最坏时间复杂度却为n*n合并排序是稳定的,而快速排序不是稳定的。

另外合并排序是拆分容易,合并困难,而快速排序则是拆分困难,合并容易。

  1. 合并排序又称归并排序,主要的思想是:将待排序列拆分至数个足够小的子序列,然后将相邻子序列合并为一个有序子序列,重复合并相邻有序子序列直到整个序列有序。

  2. 快速排序的主要思想则是,选取一个枢纽(pivot), 将待排序列拆分为左右两个子序列A,B,使得子序列A的元素都不大于pivot, 子序列B的元素都大于pivot。这样使得子序列A的元素都不大于子序列B,然后对子序列A,B递归进行这种拆分,直到待拆分的序列足够小,最后整个序列变成有序。

由于合并排序和快速排序每次元素的移动都不只移动了一个位置,因此每次元素的比较都不只消除了一个逆序对,因此对于插入排序这种每次比较只移动一个位置的算法,时间复杂度会得到改善。

快速排序的递归实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int partition(int* a, int l, int h) {
int pivot = a[l];
int i,j;
i = l - 1;
j = h + 1;
while(true) {
while(a[--j] > pivot);
while(a[++i] < pivot)
if(i >= h)
break;
//i 、j cross
if(i >= j)
break;

int tmp = a[i];
a[i] = a[j];
a[j] = tmp;
}
a[j] = pivot;
return j;
}

void quickSort(int* a, int l, int h) {
if(l >= h)
return;
int p = partition(a, l, h);
quickSort(a, l, p);
quickSort(a, p+1, h);
}

合并排序的递归实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void merge(int* a, int l, int m, int h) {
if(l == h)
return;

int* b = (int*)malloc(sizeof(int) * (h - l + 1));
int i = l, j = m + 1;
int k = 0;

while(i <= m && j <= h)
b[k++] = a[i] > a[j] ? a[j++] : a[i++];

if(i <= m)
while(i <= m)
b[k++] = a[i++];
if(j <= h)
while(j <= h)
b[k++] = a[j++];
//回写
i = l;
k = 0;

for(; i <= h;)
a[i++] = b[k++];
}

void mergeSort(int* a, int l, int h) {
if(l >= h)
return;
int m = (h + l) / 2;
mergeSort(a, l, m);
mergeSort(a, m+1, h);
merge(a, l, m, h);
}

合并排序的非递归实现

由于递归会导致函数调用栈的暴增,会引入额外的时间开销,例如现场保护和恢复之类的。通递归版本的算法都可以改写为迭代版本的算法。递归算法的优点是实现简单直观,易于理解,缺点是如果有时候调用深度过深,会带来栈内存溢出和额外的运行时间开销。迭代算法的有点是不会有额外的函数调用栈的增长, 缺点是难于理解且难于实现。

合并排序的迭代版本的实现相对来讲是比较简单直观的,大致思路就是:第1轮,待排序列S中相邻的长度为1的子序列进行合并,第2轮,序列S中相邻的长度为2的子序列进行合并,第i轮,待排序列S中相邻的长度为2的i-1次方的子序列进行合并。直到待合并的子序列数为1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void merge(int* a, int l, int m, int h) {
if(l == h)
return;

int* b = (int*)malloc(sizeof(int) * (h - l + 1));
int i = l, j = m + 1;
int k = 0;

while(i <= m && j <= h)
b[k++] = a[i] > a[j] ? a[j++] : a[i++];

if(i <= m)
while(i <= m)
b[k++] = a[i++];
if(j <= h)
while(j <= h)
b[k++] = a[j++];
//回写
i = l;
k = 0;

for(; i <= h;)
a[i++] = b[k++];
}

//非递归版本
void mergeIteation(int* a, int n) {
int offset = 1;
while(offset <= n) {
int i = 0;
for(;i < n;) {
merge(a,i, i+offset-1,i+offset*2 - 1);
i = i + offset * 2;
if(n - 1 - i < offset)
break;
}
offset *= 2;
}
}

排序及常见排序算法

排序是计算机内经常进行的一种操作,其目的是将一组“无序”的记录序列调整为“有序”的记录序列。分内部排序和外部排序。若整个排序过程不需要访问外存便能完成,则称此类排序问题为内部排序。反之,若参加排序的记录数量很大,整个序列的排序过程不可能在内存中完成,则称此类排序问题为外部排序。内部排序的过程是一个逐步扩大记录的有序序列长度的过程。

上面是关于排序的一些介绍,排序按是否使用外在存储来分分为内部排序(locally sort)外部排序(external sort)。下文分析的是内部排序的两种常见的、简单易实现的排序算法,虽然两者的排序算法的时间复杂度在众多内部排序算法中属于下等,但是对于理解内部排序(特别是采用比较排序)的一些机制是十分有用的。

常见的内部排序算法及其时间复杂度主要如下:

  • 冒泡排序 冒泡排序通过两两相邻元素比较,每一趟将本趟中最大的元素放在数组尾部合适的位置。由于每一趟只正确放置一个元素,因此时间复杂度为O(n*n)
  • 插入排序 插入排序是通过将当前第i个待排元素i与数组中前i-1个已经有序的元素j 进行两两比较,如果元素 i 小于 j,便交换。直到遇到比i小的元素或者到达数组头。由于每一个元素的插入都会最多比较i - 1次,因此最坏的时间复杂度为 1 + 2 + 3 + ... + n - 1 = n * (n - 1) / 2 , 因此时间复杂度也为O(n*n)
  • 希尔排序 插入排序的一种改进,具有优良的时间复杂度,时间复杂度为O(n*n)
  • 快速排序 基于分治的策略排序,时间复杂度为O(nlgn),由于待排数据如果是逆序,其时间复杂度会变为O(n*n),因此常用的方法是在排序前先随机化待排数据。
  • 归并排序 基于分治的策略排序,时间复杂度为O(nlgn),需要额外的存储空间
  • 堆排序 采用二叉树的性质进行排序,不需要额外的内存空间,时间复杂度为O(nlgn)
  • 计数排序 非比较排序,要求输入的数据的范围为0 ~ K,时间复杂度为O(n)
  • 基数排序 非比较排序,是计算排序的一种变种,对输入数据的要求降低到了数据的的位数为d,每一位的范围为0~k

插入排序时间复杂度分析

上文中介绍的除了计数排序和基数排序外,其余均为内排序中的比较排序,即通过两元素的比较,来确定元素正确放置的位置。

插入排序算是一种比较经典的比较类排序算法,可以分析其时间复杂度,然后确定其最坏和平均时间复杂度,从而来确定时间复杂度的最低边界。

由于没有比较,就不会有元素的交换,因此元素的交换次数是小于等于

  1. 最坏时间复杂度分析:若输入的n个数据是逆序的,那么对第i个元素的插入,需要比较i-1次,因此总的时间复杂度为为n*(n - 1) / 2
  2. 平均时间复杂度分析:对于第i个元素的插入,该元素有i + 1个位置可以插入,平均来看,插入每一个位置的概率都相同为1 / (i + 1);对于每一个位置比较的次数分别为1, 2, 3, 4, 5 ... i, i,因为当i元素为最小和次小的时候,比较次数都为i次,(当比较i次后,便可以知道该元素是最小还是次小了),所以插入第i个元素的比较次数为1 / (i + 1) * (1 + 2 +3 + ... + i) + i / (1 + i),那么对于n各元素的n-1次插入来看,总的比较次数近似于n * n / 4。所以插入排序的时间复杂度为n * n / 4

对于一个待排序列S,S(i)元素表示位置应该在i的元素。那么如果S(i) < S(j),i > j,称(S(i),S(j))为逆序对(inversion),对于每一个n个元素的输入,最多有n * (n - 1) / 2个逆序对,如果比较排序算法每一次比较最多消除一个逆序对,那么比较排序算法的最坏时间复杂度至少为 n * (n - 1) / 2

考虑待排序列的转置(transpose) 序列T,转置序列T和序列S的元素顺序是颠倒的。显然一个逆序对(inversion)不是在序列S中,就是在其转置序列T中,因此两个序列的逆序对数量之后等于n * (n - 1) / 2,每个序列逆序对数量平均为n * (n - 1) / 4 约为n*n/4

以上分析可以得出下面的理论

对于含有n个元素的待排输入,任何通过比较来排序的算法,若其每一次比较至多消除一个逆序对的话,其在最坏情况下至少得比较n(n-1)/2次,在平均情况下至少得比较n(n-1)/4

由于插入排序的最坏和平均情况下比较的次数都接近上述分析给出的数值,考虑那些只比较交换相邻元素的算法,插入排序可以算是最好的算法了。如果想要显著的改善此类排序算法的性能,我们必须在一次将元素移动超过一个位置。(或是一次消除多个逆序对)。这种想法给性能较好的快速排序和希尔排序等带来新的思路。

下面是插入排序的代码示例:

1
2
3
4
5
6
7
8
9
10
void insertSort(int* a, int n) {
int i = 0,j = 0;
for(i = 1; i < n; i++)
for(j = i; j > 0; j--) {
if(a[j] < a[j-1])
swap(&a[j], &a[j-1]);
else
break;
}
}

冒泡排序

冒泡排序得名于其简单,易用的排序风格。冒泡排序通过每一趟相邻元素的的两两比较确定某一个元素的正确的位置,这样进行n趟就确定了n个元素的位置了。冒泡排序的代码示例如下:

1
2
3
4
5
6
7
8
9
void bubbleSort(int* a, int n) {
int i = 0;
int j = 0;
for(i = 1; i <= n; i++)
for(j = 0; j < n - i; j++) {
if(a[j] > a[j+1])
swap(&a[j], &a[j+1]);
}
}

最常见的冒泡排序如上所示。

冒泡排序的改进

改进版本1

如果输入的待排序列的尾部是部分有序的,那么内部循环的j就没有必要增加到n-i了,直接在上一趟最后发生元素交换的位置lastSwapIndex,这样就可以减少没有必要的的比较了。例如序列7,8,5,6,9,10,11,第一趟比较,7,5,6,8,9,10,11 后的结果为最后交换的位置为元素6的位置(86交换),因此下一趟比较的时候,j就没有必要增加到元素10的位置了,因为上一趟最后发生交换的位置lastSwapIndex代表了此位置以后的元素都已经完排序了,不需进行没必要的比较了。

因此改进后的冒泡排序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void bubbleSort1(int* a, int n) {
int i = 1;
int j = 0;
int lastSwapIndex = n - i;
int q = n - 1;
//n趟
for(i = 1; i <= n; i++) {
for(j = 0; j < q; j++)
if(a[j] > a[j+1]) {
swap(&a[j], &a[j+1]);
lastSwapIndex = j;//记录每一趟最后交换的位置
}
q = lastSwapIndex;
}

}

通过记录上一趟待排序列尾部中最后一次进行元素交换的位置,为下一趟省去了不必要的比较次数,从而改进了冒泡排序。

改进版本2

同样的思路,如果待排序列的开始已经是部分有序了,那么内部循环的j也没有必要每一趟都从0开始了。例如待排序列1,2,3,4,9,7,8,6第一趟首次发生交换的位置为元素9的位置,那么下一趟j就没有必要从0开始了,而是从元素9的位置beginSwapIndex的前一个位置(记为beginSwapIndex - 1)开始,进行比较。因为上一趟的首次发生交换的位置beginSwapIndex表明此位置beginSwapIndex - 1的元素都是有序的了,下一趟的比较就从位置eginSwapIndex - 1开始,为什么是beginSwapIndex - 1? 而不是beginSwapIndex呢?是因为上一趟首次发生交换后,被交换到位置beginSwapIndex元素无法确定是否大于位置beginSwapIndex - 1的元素,因此需要从beginSwapIndex - 1的元素继续开始比较。

下面是改进版本2的代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//改进版1
void bubbleSort1(int* a, int n) {
int i = 1;
int j = 0;
int lastSwapIndex = n - i;
int q = n - 1;
//n趟
for(i = 1; i <= n; i++) {
for(j = 0; j < q; j++)
if(a[j] > a[j+1]) {
swap(&a[j], &a[j+1]);
lastSwapIndex = j;//记录每一趟最后交换的位置
}
q = lastSwapIndex;
}

}

//改进版2
void bubbleSort2(int *a, int n) {
int i = 1;
int j = 0;
int lastSwapIndex = n - i;
int beginSwapIndex = 0;
int q = n - 1;
int p = beginSwapIndex;

int firstFlag = 1;//是否为首次发生交换的flag
//n趟
for(i = 1; i <= n; i++) {
for(j = p; j < q; j++)
if(a[j] > a[j+1]) {
swap(&a[j], &a[j+1]);
lastSwapIndex = j;//记录每一趟最后交换的位置
if(firstFlag) {//记录每一趟开始交换的位置
beginSwapIndex = j == 0 ? 0 : j - 1;
firstFlag = 0;
}
}
q = lastSwapIndex;
firstFlag = 1;
p = beginSwapIndex;
}
}

总结

通过分析比较排序中经典的插入排序,引入了逆序对转置的概念,对所有比较排序的算法的平均和最坏时间复杂度进行了最坏的边界分析,从而得到了对于每一次比较最多消除一个逆序对的算法平均时间复杂度的边界上边界,和最坏的时间复杂度的上边界。这些分析给如何改良排序算法提供了途径。通过每一次比较,将元素移动超过不止一个位置(这样就可以消除不止一个逆序对),这样就可以将比较类排序算法显著的改善。

常用数据结构图–拓扑排序

在数学中,一个图(Graph)是表示物件与物件之间的关系的数学对象,是图论的基本研究对象。

图是十分重要的数据结构,常常被应用于实际生活的应用之中。生活中常见的问题例如交通路线图、任务指定分配、工期计算、航空网络等,都可以使用图相关的理论来建立模型。

下面是《数据结构与算法分析》对图的若干定义

一个图(Graph)G = (V, E)由顶点(vertex)集和边(Edge)集E组成。每一条边就是一个点对(v,w),其中v,w属于集合V。有时也把边Edge叫做弧(arc)。如果点对是有序的,那么图就叫做是有序的(directed)。有向的图有时候叫做有向图。顶点v和w邻接(adjacent)当且仅当(v,w)属于E。在一个具有边(v,w)从而具有边(w,v)的无向图中,w和v邻接且v和w也邻接。有时候边还具有第三种成分,叫做权(weight)或值(cost)。

图的存储

一种简单存储图的方式时采用一个被称为邻接矩阵的二维数组a[i][j],数组的大小为n * nn 为图的顶点个数。其中如果顶点i到顶点j连通,那么a[i][j] = 1,否则a[i][j] = 0。这种存储方式的优点是简单直观,实现方便。缺点也很明显,所需要的存储空间巨大。

当含有n个顶点的图G中大多数顶点都不是连通,那么意味中n * n 邻接矩阵中有大量的元素为0,即此时邻接矩阵是稀疏矩阵。

另一种常见的存储方式称为邻接表(adjacent list),这种方式是申请一个大小为n 的数组head,数组元素head[i],存放着由顶点i的所有邻接顶底组成的链表的头地址。此种存储方式的优点显而易见,相比于前一种方式,存储空间的大小明显减小。但是缺点是不直观,编码有难度。

拓扑排序

拓扑排序是对又向无圈图的顶点的一种排序,它使得如果存在一条从ViVj 的路径,那么在排序中Vj 必须出现在 Vi 的后面。

一种简单的求拓扑排序的算法先是找出任意一个入度为0的顶点。然后我们输出该顶点,并将它和它的边一起冲图中删除。然后,将其邻接的顶点的入度减去1。然后重复上述过程,直达图被完全删除。

不难看出,此种算法首先是外层循环 n 次,其次是内部循环中在选取入度为0 的顶点时候,会内部循环n次。因此总的时间复杂度会达到n * n

另一种较好的改进方法是,将所有入度为0的顶点压入某个栈,然后每一次输出顶底元素A后,再将A的所有邻接顶点的入度减去1,如果某个邻接顶点的入度此时为0,那么将其继续入栈。重复上诉操作指导栈空。

可以看出,对每一个入度为0的顶点入栈的操作执行了n 次,n 为顶点数。对出栈的元素A,将其邻接顶点的入度减1,然后入栈的操作,最多执行了 m 次, m 为图边的条数。因此总的时间复杂度就会是线性的 O(n)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <stdio.h>
#include <stdlib.h>


struct Node {
int value;
int indegree;
struct Node *next;
};

//初始化邻接表
struct Node* initAdjList(int n) {
struct Node* headers;
headers = (struct Node*)malloc(sizeof(struct Node) * n);
int i = 0;
for(i; i < n; i++) {
headers[i].next = NULL;
headers[i].value = 0;
headers[i].indegree = 0;
}
return headers;
}

void addAdj(struct Node* header, int m, int n) {
struct Node* p = &header[m];
p->value++;
while(p->next != NULL)
p = p->next;
p->next = (struct Node*)malloc(sizeof(struct Node));
p->next->value = n;
p->next->next = NULL;
}

//打印邻接表
void printAdjList(struct Node* header, int n) {
int i = 0;
for(i; i < n; i++) {
struct Node* p = &header[i];
printf("Number of %d' adj : %d\t", i, p->value);
while(p->next!= NULL) {
printf("%d --->%d\t", i, p->next->value);
p = p->next;
}
printf("\n");
}
}

//拓扑排序
int* topSort(struct Node* headers, int n) {
int* zeroStack = (int*)malloc(sizeof(int) * n);
int* result = (int*)malloc(sizeof(int) * n);
int count = 0;
int pIndex = -1;
int i = 0;
while(i < n) {
struct Node* p = &headers[i];
//入度为0,直接进栈
if(p->indegree == 0)
zeroStack[++pIndex] = i;
i++;
}

while(1) {
//从top里面出栈一个Node Index
int zeroIndex = zeroStack[pIndex--];
result[count++] = zeroIndex;
struct Node* zeroNode = &headers[zeroIndex];
//将zeroNode的连接点,对应的头结点的值减一
while(zeroNode->next != NULL) {
struct Node* q = &headers[zeroNode->next->value];
if(--q->indegree == 0)
zeroStack[++pIndex] = zeroNode->next->value;
zeroNode = zeroNode->next;
}
//栈空
if(pIndex < 0)
break;
}

return result;
}

int main()
{
int a[7][7] = { {0,1,1,1,0,0,0},
{0,0,0,0,1,1,0},
{0,0,0,0,0,0,1},
{0,0,1,0,0,1,1},
{0,0,0,1,0,0,1},
{0,0,0,0,0,0,0},
{0,0,0,0,0,1,0}
};
int n = 7;
struct Node* headers = initAdjList(n);
int i = 0;
int j = 0;
for(i = 0; i < n; i++)
for(j = 0; j < n; j++) {
if(a[i][j] == 1)
addAdj(headers, i, j);
}

//生成各节点indegree
for(i = 0; i < n; i++) {
struct Node* p = &headers[i];
while(p->next != NULL) {
headers[p->next->value].indegree++;
p = p->next;
}
}

int* q = topSort(headers, n);
printAdjList(headers, n);
for(i = 0; i < n; i++) {
printf("%d \n", *q++ + 1);
}
return 0;
}

常用数据结构栈的应用—-表达式求值

栈是常用的数据结构,栈又称堆栈,是一种受限的线性表。其限制是允许在表中的一端进行插入和删除元素。栈中的元素符合后进先出(FILO)的性质。允许插入和删除元素的一端被称为栈顶,另一端被称为栈底。
栈有两种关键的操作,分别为出栈和压栈。

栈有两种关键的操作,分别为出栈和压栈。

  • 出栈(pop):它是把栈顶的元素E删除,使E的下一个元素称为新的栈顶,并返回元素E
  • 压栈(push):它是将元素E插入栈顶,使得新插入的元素E称为新的栈顶。

栈的常见的应用主要有:编译器中语法分析的符号匹配、表达式求值、程序的函数调用等。
例如:操作系统中的进程的上下文切换,里面被切换下CPU的进程的现场信息例如寄存器等信息,被保存在堆栈中,等到CPU轮转到该进程时,使用堆栈恢复现场。

表达式求值

表达式求值是栈的一个重要的应用。例如计算器中的加减乘除表达式的计算,都会使用栈来进行求值。
表达式的表示方法主要有中缀表示法和后缀表示法。

  • 中缀表示法:操作符号处于两个操作数的中间例如3+4,中缀表达式是符合人们思维的算术表达式方法,中缀表达式通常包含圆括号和方括号。中缀表达式不容易被计算机所理解,因此不太方便使用其进行表达式求值。

    中缀表示的例子 1 + 3 * (4 + 5)

  • 后缀表达式:不包含括号,运算符号放在两个运算对象的后面,所有的计算按运算符号出现的顺序,严格的从左向右进行运算(不再需要考虑运算符号的优先规则)。

    后缀表达式的例子21+3* 对应中缀表达式为(2+1)*3

后缀表达式求值

使用后缀表达式进行求值的时候,不需要考虑括号和运算符号的优先规则,只需要从左到右进行计算求值即可。

后缀表达式的求值过程为:

1. 读入操作数,压入栈中直到读取到操作符
2. 如果读取到为操作符,则将栈顶的前两元素出栈,使用该操作符进行运算,得到计算结果
3. 将计算结果压入栈中
4. 重复1、2、3直到表达式末尾

最后栈中只有一个元素,变为最后的计算结果。将其出栈即可。

例如:
中缀表达式(2+1)*3
对应的后缀表达式为21+3*

  • 初始化栈S
  • 读取2和1压入S,此时S为1,2
  • 读取到操作符+,出栈栈顶两元素得到1,2,此时栈为空
  • 使用操作符计算两操作数,得到3(1 + 2 = 3),将3压入栈S,此时栈S为3
  • 读入3,压入栈S,此时栈为3,3
  • 读取到操作符*,出栈栈顶两元素得到3,3 此时栈S为空
  • 使用操作符*,对两元素进行运算得到9,将9压入栈S
  • 读取到表达式尾部

出栈栈顶元素得到9即为计算结果

计算机通常使用后缀表达式进行表达式求值,但是人们通常输入计算的表达式是中缀表达式,因此在进行表达式求值的时候,应该先将中缀表达式转为后缀表达式,然后使用后缀表达式求出表达式值。后缀表达式求值的过程很简单,已经上面分析过了。现在关键的一步就是中缀表达式转为后缀表达式。

中缀表达式转后缀表达式

中缀表达式转后缀表达式是表达式求值关键的一步,其过程如下:
输入中缀表达式IEXP,
输出后缀表达式SEXP
依次读入中缀表达式IEXP,初始化一个栈S用来存放操作符OP。

1. 如果读取的是操作数,直接输出到SEXP。如果为操作符OP,进入step2,如果为空,证明读取到IEXP尾部,则进入step5
2. 判断OP,若为`(`,无任何输出,直接压栈`(`到S,进入step1;若为`)`,则出栈S的元素输出到SEXP中直到遇见元素`(`,且`(`不输出到SEXP中,然后进入step1;若OP不为`(`和`)`,则执行step3
3. 判断栈S的栈顶元素是否为`(`,若为`(`,则直接将OP压栈到S中;若不为`(`,则将栈S的元素出栈输出到SEXP中,直到栈顶元素的优先级小于OP或者栈空,最后进入step4
4. 将OP压入栈S中,进入step1
5. 将栈所有元素输出到SEXP中

例如
IEXP为(2+1)*3
初始化一个空栈S.

  1. 读取到操作符(,直接压入栈S,此时栈S为(、
  2. 读取到操作数2,则直接输出到SEXP,此时S仍为,SEXP为 2
  3. 读取到操作符号+,由于栈S栈顶元素为(,因此直接将+,压入栈S中,此时SEXP为2,栈S为(、+
  4. 读取到操作数1,直接输出,此时SEXP为21, 栈S为(、+
  5. 读取到操作符),则出栈S的元素直到碰见(,不输出(),则SEXP为21+,栈S为空
  6. 读取到操作符号*'压栈到S,此时SEXP仍为21+,栈S为*
  7. 读取到操作数3,直接输出到SEXP中,SEXP为21+3,栈S为*
  8. 读取到文件尾,则出栈S知道栈空,SEXP为21+3*

最后得到后缀表达式21+3*

可见,中缀转后缀的关键点在于读取到的操作符为括号的case,栈中的(保留直到遇见操作符),否则是不会出栈的。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//int栈
#include <stdio.h>
#include <malloc.h>
#define INCREMENT 20

struct Stack_{
int* a;
int capacity;
int size;
int top;
};

int pop_(struct Stack_* stack) {
if(stack->size <= 0 || stack->top < 0)
return NULL;
stack->size--;
return stack->a[stack->top--];
}

void push_(struct Stack_* stack, int c) {
//扩容
if(stack->size <= stack->capacity) {
stack->capacity += INCREMENT;
int* p = (int*)malloc(sizeof(int) * ( stack->capacity));
int i;
int* q;
q = p;
for(i = 0; i < stack->size; i++) {
*(q++) = *(stack->a++);
}
stack->a = p;
}

//压栈
stack->size++;
stack->top++;
stack->a[stack->top] = c;
}

struct Stack_* initStack_(int capacity) {
struct Stack_* stack = (struct Stack_*)malloc(sizeof(struct Stack_));
int* a = (int*)malloc(sizeof(int) * capacity);
stack->size = 0;
stack->top = -1;
stack->a = a;
return stack;
};

//测试是否字符数组是否存在test字符
int in(char* a, int n, char test) {
int i = 0;
for(i; i < n; i++) {
if(a[i] == test)
return 1;
}
return 0;
}

//优先级比较
int compare(char a, char b) {

if(a == b)
return 0;

if(a == '+') {
if(b == '*' || b == '(' || b == ')')
return -1;
}

if(a == '*') {
if(b == '+')
return +1;
if(b == '(' || b == ')')
return -1;
}
}

//中缀表达式转为后缀表达式
char* infix2Suffix(char* infix, int n) {
struct Stack* stack = initStack(n);
char add = '+';
char mult = '*';
char left = '(';
char right = ')';
char op[4] = {'+', '*', '(', ')'};
int i = 0;
char *result = (char*)malloc(sizeof(char) * n);
char top;
int j = 0;

for(i; i < n; i++) {
char c = infix[i];
//如果读取的字符不为操纵符,为操作数
//直接放入输出字符数组中
if(!in(op, 4, c))
result[j++] = c;
else {//开始将操作符压栈
if(stack->size == 0)
push(stack, c);
else {
//
if(c != '(' && c != ')') {
while(1) {
top = pop(stack);
if(top == NULL)
break;
if(top == '(' || compare(c, top) > 0) {
push(stack, top);
break;
} else if (compare(c, top) <= 0)
result[j++] = top;
}
push(stack, c);
}

if(c == '(')
push(stack, c);

if(c == ')') {
while(1) {
top = pop(stack);
if(top != NULL && top != '(')
result[j++] = top;
else
break;
}
}


}
}
}
//输出栈中剩余的内容
while(1) {
top = pop(stack);
if(top == NULL)
break;
result[j++] = top;
}
result[j++] = '#';
return result;
}

//中缀表达式求值
int evaluate(char* infix, int n) {
char *suffix = infix2Suffix(infix, n);
struct Stack_* stack = initStack_(n);
char op[4] = {'+', '*', '(', ')'};
int i = 0;
char opnum1, opnum2;
char c;

while((c = suffix[i++]) != '#') {
if(!in(op, 4, c))
push_(stack, (int)c - (int)'0');
else if(in(op, 4, c)){
int opnum1 = pop_(stack);
int opnum2 = pop_(stack);
int tmp;
if(c == '+')
tmp = opnum1 + opnum2;
if(c == '*')
tmp = opnum1 * opnum2;
push_(stack, tmp);
}
}

return pop_(stack);
}

int main() {

char infixExp[13] = {'6', '+', '5', '+', '7',
'*', '8', '*', '(', '1',
'+', '2', ')'};
int result = evaluate(infixExp, 13);
char *suffixExp = infix2Suffix(infixExp, 13);
printf("\n");
while(*suffixExp!= '#')
printf("%c", *suffixExp++);
printf("\n%d", result);
return;
}

1. 虚拟IP

虚拟IP(Virtual IP Address),是一种不与特定计算机或者特定计算机网卡相对应的IP地址。所有发往这个IP地址的数据包最后都会经过真实的网卡到达目的主机的目的进程。

引用维基上面的定义:

A virtual IP address (VIP or VIPA) is an IP address that doesn’t correspond to an actual physical network interface (port). Uses for VIPs include network address translation (especially, one-to-many NAT), fault-tolerance, and mobility.

虚拟IP主要是用来网络地址转换,网络容错和可移动性。

虚拟IP比较常见的一个用例就是在**系统高可用性(High Availability HA)**方面的应用,通常一个系统会因为日常维护或者非计划外的情况而发生宕机,为了提高系统对外服务的高可用性,就会采用主备模式进行高可用性的配置。

当提供服务的主机M宕机后,服务会切换到备用主机S继续对外提供服务。而这一切用户是感觉不到的,在这种情况下系统对客户端提供服务的IP地址就会是一个虚拟IP,当主机M宕机后,虚拟IP便会漂浮到备机上,继续提供服务。

在这种情况下,虚拟IP就不是与特定计算主机或者特定某个物理网卡对应,而是一种逻辑的概念。它是可以自由移动(自由漂浮)的,这样既对外屏蔽了系统内部的细节,又为系统内部的可维护性和扩展性提供了方便。

2. ARP协议

2.1 ARP协议

ARP协议属于TCP/IP协议族里一种将IP地址解析为MAC地址的协议,位于TCP/IP五层模型中的网络层。该协议是用来在局域网内解析IP地址对应的物理地址

这里写图片描述

通常一个主机A给另一个主机B通过网络发送一个IP数据报的时候,首先会发送到主机A所在网络的的路由器上面,然后路由器会判断目的地址是否在本网络内,是则直接转发到本网络内的目的主机;否则会继续传递到下一个路由,直到到达指定的网络的路由器,指定网络的路由器会将此数据报发送到目的主机

整个过程最后都会涉及到由某一个网络的路由器(或者网关)将数据发送到网内某一主机的过程。这个过程通常是由路由器发送一个ARP广播请求,请求IP地址和数据包目的IP地址一致的主机将它自己的MAC地址返回给路由器,因为数据链路层的数据传输是通过物理地址传输的

ARP请求会广播到所有局域网内的主机,网内其他主机收到这个ARP请求

  1. 检查发送ARP请求的主机的IP地址

  2. 将该IP地址和其对应的MAC地址存放在ARP缓存

  3. 检查这个ARP请求中请求的IP地址是否为自己的IP地址,是则发送一个ARP应答,应答包含自己的IP地址和对应的MAC地址。

当网络内的路由器得到了MAC地址后,便可以通过数据链路层将数据包正确传输到目的主机上了。

2.2 ARP缓存

ARP协议中比较重要的内容之一就是ARP缓存,主机操作系统会将IP地址MAC地址的映射关系存放在主机的一片高速缓存中。

  • 缓存失效:该缓存会在一定时间内失效,失效后,请求该IP地址时需要广播arp请求重新获取IP地址对应的MAC地址

  • 缓存更新:当收到ARP请求时,会将发送ARP请求的主机IP地址与MAC地址记录下来,然后去更新本机ARP缓存中对应的记录

3. 虚拟IP与ARP协议

虚拟IP常用于系统高可用性的场景,那么虚拟IP实现的原理是什么?虚拟能够自由漂浮的原理是什么?

从前文介绍arp协议里面来看,主机与主机的通信过程都会涉及到一个ip地址转换mac地址的过程,那么虚拟IP的通信也不会例外。因此,IP地址在主机通信的过程中其实就是一个逻辑地址。

我们知道,每一个主机都存放着网络内一些主机的逻辑地址与物理地址(MAC地址)的映射,那么问题来了:

当虚拟IP(简称VIP)在主机A上时,主机AMAC地址为MAC_A,某主机Marp缓存中存放着一个映射关系:VIP <–> MAC_A;当主机A宕机后, VIP漂浮到了主机B,主机BMAC地址为MAC_B,那么此时主机M想与虚拟IP通信时,是做不到的;因为它的arp高速缓存中的VIP的映射还指向主机AMAC地址。这个问题解决的思路就是当虚拟IP漂浮后,刷新所有其他主机的arp缓存

那么虚拟IP是如何实现漂浮后,是如何刷新所有其他主机的ARP缓存的呢?

这里就会引入另一个概念,GARP(简称无端ARP或者免费ARP),主要是用来当某一个主机C开机时,用来确认自己的IP地址没有被人占用而做的一个检测。广播发送这个arp,请求得到本机IP地址的MAC地址,主机C并不希望此次arp请求会有arp应答,因为应答意味着IP地址冲突了当其他主机收到这个arp请求后,会刷新关于这个arp请求源的主机IP地址的映射。

GARP的作用主要有两个:

  1. 检测IP地址是否有冲突
  2. 刷新其他主机关于本次IP地址的映射关系

集群管理软件Pacemaker里面的资源代理ocf:heartbeat:IPaddr2中,在虚拟IP漂浮后,会向网络内广播发送garp请求,以此来刷新其他主机的arp缓存。

在配置OpenStack控制节点高可用性的时候,出现过虚拟IP切换时,某一个主机不能通信的问题,后来发现是arp缓存没有刷新,有时候由于网络的原因,某些主机没有接收到此garp请求,因此ocf:heartbeat:IPaddr2资源代理中可以配置发送garp的次数,这里建议次数配置得多一点,这样可以保证其他主机成功刷新arp缓存。

0%