2. Container

介绍

  • stack
  • service
  • container
    使用Docker的方式构建一个app。我们从app的层次结构的最下层开始,最下层是容器。接下来上层是service,这一层定义了生产环境容器如何运作,最顶层是stack,定义了service是如何交互的。

新的开发环境

在过去你要写一个python app的时候,你的第一件事情就是在你的机器上安装python环境。但是同样的,你的生产环境要完美的匹配你的开发环境。

使用Docker,你可以构建一个可移植的python runtime作为一个镜像,然后就没有安装的必要了。接下来,你可以将python镜像作为一个基础镜像脱离于你的app代码,确保app和依赖、运行时是独立的。

这些可移植的镜像可以通过Dockerfile来定义。

使用Dockerfile来定义一个容器

dockerfile定义你的容器里面的环境包含什么。在容器环境里,访问诸如网络和磁盘等资源都是被虚拟化了的,这些资源是和系统其他资源是隔离的,因此你需要对外做端口映射,和指定哪些文件需要被copy进容器环境中。完成上述工作后,你可以验证基于Dockerfile构建的app运行效果和预期一致。

Dockerfile

创建一个空目录,创建一个文件Dockerfile如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Use an official Python runtime as a parent image
FROM python:2.7-slim

# Set the working directory to /app
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Make port 80 available to the world outside this container
EXPOSE 80

# Define environment variable
ENV NAME World

# Run app.py when the container launches
CMD ["python", "app.py"]

Dockerfile需要两个额外的文件 app.py requirments.txt。

app

新建app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from flask import Flask
from redis import Redis, RedisError
import os
import socket

# Connect to Redis
redis = Redis(host="redis", db=0, socket_connect_timeout=2, socket_timeout=2)

app = Flask(__name__)

@app.route("/")
def hello():
try:
visits = redis.incr("counter")
except RedisError:
visits = "<i>cannot connect to Redis, counter disabled</i>"

html = "<h3>Hello {name}!</h3>" \
"<b>Hostname:</b> {hostname}<br/>" \
"<b>Visits:</b> {visits}"
return html.format(name=os.getenv("NAME", "world"), hostname=socket.gethostname(), visits=visits)

if __name__ == "__main__":
app.run(host='0.0.0.0', port=80)

新建requirements.txt

1
2
Flask
Redis

构建app

运行docker命令构建镜像

1
docker build -t friendlyhello .

image.png | left | 747x427

查看构建成功的镜像

1
docker image ls

运行app

1
docker run -p 4000:80 friendlyhello

image.png | left | 747x143

启动成功后访问localhost:4000

image.png | left | 747x185

共享镜像

构建好的镜像可以将其push到镜像仓库中,便于和其他人共享。

registry是一系列仓库的集合,一个仓库又是一系列镜像的集合。

Docker官方提供一个公开的registry,免费的、预配置的。

使用Docker ID登录

运行命令:

1
docker login

镜像打标

将本地镜像和远程registry的仓库关联的写法是:username/repository:tag。
tag是可选的但是被推荐的写法,registry使用这个机制来给docker镜像打上一个版本。给repository和tag有意义的命名是推荐的。例如:get-started:part2,这个写法将镜像存放在get-started仓库,然后打上了part2标。

使用docker image tag命令,打标本地镜像。

1
docker tag image username/repository:tag

例如:

1
docker tag  friendlyhello dockerjie/get-started:part2

image.png | left | 681x124

publish镜像

上传打标的镜像到远程的仓库。

1
docker push username/get-started:tag

一旦完成,这个上传的结果是公开可用的。如果我们登录到Docker Hub,我们可以看见新的镜像在那,可以使用pull命令。

image.png | left | 681x163

一旦上传完成就可以使用远程仓库的镜像来创建容器了。

1
docker run -p 4000:80 username/get-started:part2

如果本地不存在该镜像,docker会使用远程的镜像。

1. Orientation

Docker是开发人员和运维人员使用容器来开发、部署和运行应用的平台。使用linux容器来部署应用被称为容器化。容器概念不是新的,但是使用容器来便利部署应用却是新的。

容器化越来越受欢迎,因为容器具有以下特点:

  • 灵活:大型应用也可以被容器化
  • 轻量:容器共享同一个os内核
  • 通用:可以on-the-fly的进行更新升级
  • 可移植:可以本地构建、云端部署
  • 可伸缩:自动分发容器副本
  • 可堆叠:可以on-the-fly垂直堆叠更多的服务

镜像和容器

镜像和容器的概念类似与程序和进程。
镜像是一个可执行的package,里面包含运行一个应用所需要的任何东西,例如代码、运行时、库、环境变量和配置文件等。

容器是镜像的运行实例。使用

1
docker ps

查看运行的容器。

容器和虚拟机

容器运行在linux上,和其他容器共享宿主机的内核。它运行在单独的进程,因此是轻量级的。

而虚拟机则运行在一个单独的操作系统上,这个操作系统运行在硬件虚拟化管理器hyperVisor上面。通常,虚拟机环境提供的资源超过了大部分程序的需求。

image.png | left | 332x298

image.png | left | 330x297

准备Docker环境

参见之前的docker安装博客。

  • 查看docker版本
1
docker --version
  • 查看docker详情
1
docker info
  • 查看docker镜像列表
1
docker image ls
  • 查看docker容器列表
1
docker container ls --all
  • 运行镜像,启动一个容器
1
docker run image-name

容器化使得CI/CD无缝连接,例如:

  • 应用没有系统环境依赖
  • 更新能被推送到一个分布式应用的任何部分
  • 资源密度能被优化
    使用docker,伸缩应用也是一件很轻松的事情,而不需要接触厚重的VM机器。

1. RPC

RPCRemote Procedure Call(远程过程调用),直白的说就是:向调用本地服务一样调用远程计算机的服务。

由于现代应用日益复杂,单台机器显然不能满足需求。于是将业务按照一定的方式拆分开来,分散到多台机器上(或者一台机器上的多个进程),让每台机器各司其职,物尽其用。SOA和当前盛行的微服务便是从此变革历史中出现的产物。

为了达到各个分散在不同机器上的服务做到相互独立又相互联系,当计算机A上的服务ServiceA需要调用计算机B上的服务ServiceB时,需要一种互相遵守的协议来完成一次调用。

而这些协议便是RPC框架包含的内容。

以下从三个方面分析了高性能RPC的三个关键要素:

  • 传输协议

RPC 可基于 HTTPTCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。有两方面会直接影响 RPC 的性能,一是传输方式,二是序列化

  • 序列化方式

众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。

  • 高并发(IO方式)

为了支持高并发,传统的阻塞式 IO 显然不太合适,因此我们需要异步的 IO,即 NIO。Java 提供了 NIO 的解决方案,Java 7 也提供了更优秀的 NIO.2 支持,用 Java 实现 NIO 并不是遥不可及的事情,只是需要我们熟悉 NIO 的技术细节。

另外,服务部署在分布式环境下的不同节点,因此还需要提供一个服务注册与发现中心(Service Registry),让客户端发现可用的服务。应用、服务、注册表之间的关系如下图:

考虑到上述的几个关键要素,做出如下选型:

  1. Spring : 业界权威的依赖注入框架
  2. Netty: 封装了Java的NIO的一个网络框架
  3. Protostuff: 基于Google的Protobuf的序列化框架,面向POJO,无需编写、编译.proto文件
  4. Zookeeper: 分布式系统的必备选择,提供服务发现和服务注册功能

参考了某篇博客,搭建一个基于TCP协议,采用Protostuff序列化方式,提供NIO支持且具备服务注册和发现的轻量级RPC框架。

2. 搭建轻量级RPC框架

第一步:编写服务接口

没有规矩,不成方圆。制定客户端和服务端共同遵循的接口。

1
2
3
public interface IHelloService {
public String hello(String name);
}

第二步:编写服务接口的实现类

由于服务端提供服务,因此需要在服务端实现该接口,以便客户端后续通过RPC调用。

由于不同的服务是按照接口来区分的,而同一个服务实现类可以实现不同的接口(这些接口可能是服务接口或者其他接口),换句话说一个服务实现类可以提供多种不同的服务。

为了达到标示某个服务实现类对应哪个服务接口,需要自定义一个注解,然后通过Spring扫描包含该注解的Bean,这样就可以发现某个服务接口对应哪个服务实现类。

  1. 自定义注解

    1
    2
    3
    4
    5
    6
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component// 表明可被Spring扫描
    public @interface RpcService {
    Class<?> value();
    }

  2. 编写服务实现类

    1
    2
    3
    4
    5
    6
    7
    8
    @RpcService(IHelloService.class)//稍后通过Spring扫描
    public class HelloWorldImpl implements IHelloService {

    public String hello(String name) {
    // TODO Auto-generated method stub
    return "Hello " + name + " ==> from remote greeting";
    }
    }

第三步: 配置服务端

  1. 由于使用了Spring实现依赖注入,此处使用了XML方式声明Bean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<context:component-scan base-package="dlut.rpc"/>

<context:property-placeholder location="classpath:rpc.properties"/>

<!-- 配置服务发现组件 -->
<bean id="serviceRegistry" class="dlut.rpc.server.ZookeeperServiceRegistry">
<constructor-arg name="registryAddress" value="${rpc.registry_address}"/>
</bean>

<!-- 配置 RPC Server-->
<bean id="rpcServer" class="dlut.rpc.server.RpcServer">
<constructor-arg name="serverAddress" value="${rpc.server_address}"/>
<constructor-arg name="serviceRegistry" ref="serviceRegistry"/>
</bean>
</beans>

服务端的Spring配置声明了如下内容:

  • 扫描路径
  • 属性文件的路径
  • 两个Bean,一个是ZookeeperServiceRegistry,另一个是RpcServer,且都通过构造器注入。
  1. 编写rpc.properties配置文件

    1
    2
    rpc.registry_address=127.0.0.1:2181
    rpc.server_address=127.0.0.1:8000

    主要是配置一些端口信息

第四步: 编写服务端代码

由于RpcServer相当于是一个运行的入口,因此需要在所有Bean实例化后,做一些初始化的操作,这里RpcServer本来也是一个由Spring IoC容器管理的Bean(上面的服务器的Spring.xml配置中已经配置了)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class RpcServer implements ApplicationContextAware, InitializingBean {

private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);

private String serverAddress;
private ZookeeperServiceRegistry serviceRegistry;//由IoC自动注入

private Map<String, Object> handlerMap = new HashMap<>();

public RpcServer(String serverAddress) {
this.serverAddress = serverAddress;
}

public RpcServer(String serverAddress, ZookeeperServiceRegistry serviceRegistry) {
// TODO Auto-generated constructor stub
this.serverAddress = serverAddress;
this.serviceRegistry = serviceRegistry;
}

@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
// TODO Auto-generated method stub
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);//通过注解去发现提供rpc服务的Bean
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);//建立服务接口--服务实现类的映射
}
}
}

@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootStrap = new ServerBootstrap();
// 配置Server端的NIO
bootStrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// TODO Auto-generated method stub
LOGGER.debug("initChannel");
ch.pipeline()
.addLast(new RpcDecoder(RpcRequest.class))// 处理Rpc请求 RpcRequest
.addLast(new RpcEncoder(RpcResponse.class))// 处理Rpc RpcResponse
.addLast(new RpcHandler(handlerMap));// 处理Rpc请求
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true);

// 解析地址
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);

ChannelFuture future = bootStrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);

// 注册服务
if (serviceRegistry != null) {
serviceRegistry.register(serverAddress);
}
future.channel().closeFuture().sync();
} finally {
LOGGER.debug("worker group and boss group shutdown");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

同时需要编写一个服务端的handler类(RpcServer当收到请求,并将其转换为RpcRequest后,RpcHandler将做进一步处理)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {

private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class);
private final Map<String, Object> handlerMap;

public RpcHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
// TODO Auto-generated method stub
RpcResponse response = new RpcResponse();
response.setRequestId(msg.getRequestId());
try {
Object result = handle(msg);
response.setResult(result);
} catch (Throwable t) {
LOGGER.debug("handle ocurred error ==> {}", t);
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);//写完然后关闭channel
}

private Object handle(RpcRequest request) throws Throwable {
String className = request.getClassName();//此处极易出错,需要保证客户端和服务端的className是一致的,否则直接GG
Object serviceBean = handlerMap.get(className);
if (serviceBean == null) {
throw new Throwable(String.format("can not find service bean by given name [%s] in server ", className));
}
System.out.println(handlerMap);
System.out.println(className);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
LOGGER.debug("handle request {}", request.getRequestId());
//以下为利用CGlib反射调用serviceBean的方法
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
LOGGER.error("server caught exception", cause);
ctx.close();
}
}

最后需要编写一个引导类,用于加载Spring.xml文件启动Spring IoC容器

1
2
3
4
5
6
7
8
9
public class RpcBootstrap {

@SuppressWarnings("resource")
public static void main(String[] args) {
// TODO Auto-generated method stub
new ClassPathXmlApplicationContext("spring.xml");
}

}

第五步: 编写common类

第四步涉及到几个POJO封装类,由于这几个类无论是客户端还是服务端都是需要,因此应该单独将其打包作为公共的jar包。

  • RpcRequest
  • RpcResponse
  • RpcEncoder
  • RpcDecoder
  • SerializationUtil
  • Constants
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;

public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RpcResponse {
private String requestId;
private Throwable error;
private Object result;

public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}

@Override
public String toString() {
// TODO Auto-generated method stub
return "[id: " + requestId + ", result: " + result + ", error: " + error + "]";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class RpcEncoder extends MessageToByteEncoder<RpcResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcEncoder.class);

@Override
protected void encode(ChannelHandlerContext ctx, RpcResponse msg, ByteBuf out) throws Exception {
// TODO Auto-generated method stub
byte[] data = SerializationUtil.serialize(msg);
LOGGER.debug("encode => datalength => {}", data.length);
out.writeInt(data.length);
out.writeBytes(data);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RpcDecoder extends ByteToMessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcDecoder.class);
private Class<?> genericClass;

public RpcDecoder(Class<?> genericClass) {
// TODO Auto-generated constructor stub
this.genericClass = genericClass;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// TODO Auto-generated method stub
LOGGER.debug("decode => datalength => {}", in.readableBytes());
//按照自己的协议,进行解析;
//dataLength|dataContent
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}

if (in.readableBytes() < dataLength) {//数据还未完全接受完毕,重置in的read Index;等待下一次decode;;
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserialize(data, this.genericClass);//反序列化
LOGGER.debug("decode => deserialize ok");
out.add(obj);
}


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SerializationUtil {

private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
private static ObjenesisStd objenesis = new ObjenesisStd(true);

private SerializationUtil() {
}

@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
if (schema != null) {
cachedSchema.put(cls, schema);
}
}
return schema;
}

public static <T> T deserialize(byte[] data, Class<T> cls) {
// TODO Auto-generated method stub
try {
T msg = objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data,msg, schema);
return msg;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T msg) {
// TODO Auto-generated method stub
Class<T> cls = (Class<T>) msg.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(msg, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}

}
1
2
3
4
5
public interface Constant {
int ZK_SESSION_TIMEOUT = 5000;
String ZK_REGISTRY_PATH = "/registry";
String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";
}

至此服务端代码编写完毕,接下来编写客户端代码。

第六步:配置客户端

和配置服务端一样,也是对客户端的Spring.xml进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<context:component-scan base-package="dlut.rpc-client"/>

<context:property-placeholder location="classpath:rpc-client.properties"/>

<!-- 配置服务发现组件 -->
<bean id="serviceDiscover" class="dlut.rpc_client.ServiceDiscover">
<constructor-arg name="registryAddress" value="${rpc.registry_address}"/>
</bean>

<!-- 配置 RPC 代理 -->
<bean id="rpcProxy" class="dlut.rpc_client.RpcProxy">
<constructor-arg name="serviceDiscover" ref="serviceDiscover"/>
</bean>
</beans>

编写配置文件rpc-client.properties

1
rpc.registry_address=127.0.0.1:2181

第七步: 编写客户端

  • 编写RpcClient类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);

    private String host = null;
    private int port = 0;

    private RpcResponse response;

    private Object lock = new Object();;

    public RpcClient(String host, int port) {
    // TODO Auto-generated constructor stub
    this.host = host;
    this.port = port;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
    // TODO Auto-generated method stub
    this.response = msg;
    LOGGER.debug("receive msg from ");
    LOGGER.debug("content => {}", this.response);
    synchronized (lock) {
    lock.notifyAll();
    }
    }

    public RpcResponse send(RpcRequest request) throws InterruptedException {
    // TODO Auto-generated method stub
    EventLoopGroup group = new NioEventLoopGroup();
    try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    // TODO Auto-generated method stub
    ch.pipeline()
    .addLast(new RpcEncoder(RpcRequest.class)) //编码请求
    .addLast(new RpcDecoder(RpcResponse.class))//解码回复
    .addLast(RpcClient.this);
    }

    })
    .option(ChannelOption.SO_KEEPALIVE, true);
    LOGGER.debug("host => {}, port => {}", host, port);
    ChannelFuture future = bootstrap.connect(host, port).sync();
    future.channel().writeAndFlush(request).sync();

    LOGGER.debug("connected");

    synchronized (lock ) {
    lock.wait();//阻塞直到消息被RpcResponse被读取到
    }

    if (response != null) {
    future.channel().closeFuture().sync();
    }
    return response;
    } finally {
    group.shutdownGracefully();
    }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    // TODO Auto-generated method stub
    LOGGER.error("client caught exception", cause);
    ctx.close();
    }
    }

  • 编写RpcProxy类,用于生成对RPC服务接口的代理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    public class RpcProxy {
    private String serverAddress;
    private ServiceDiscover serviceDiscover;

    public RpcProxy(ServiceDiscover serviceDiscover) {
    this.serviceDiscover = serviceDiscover;
    }

    public RpcProxy(String serverAddress) {
    this.serverAddress = serverAddress;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass) {
    return (T) Proxy.newProxyInstance(
    interfaceClass.getClassLoader(),
    new Class<?>[]{interfaceClass},
    new InvocationHandler() {

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // TODO Auto-generated method stub
    //封装请求
    RpcRequest request = new RpcRequest();
    request.setRequestId(UUID.randomUUID().toString());
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParameterTypes(method.getParameterTypes());
    request.setParameters(args);

    if (serviceDiscover != null) {
    serverAddress = serviceDiscover.discover();
    }

    String[] array = serverAddress.split(":");
    String host = array[0];
    int port = Integer.parseInt(array[1]);

    RpcClient client = new RpcClient(host, port);
    RpcResponse response = client.send(request);//阻塞直到方法返回
    Object obj = response.getResult();

    System.out.println(Arrays.toString(obj.getClass().getTypeParameters()));

    if (response == null)
    throw new Exception("response is null");
    if (response.getError() != null) {
    throw response.getError();
    } else {
    return response.getResult();
    }
    }
    });
    }
    }

  • 编写服务发现ServiceDiscover类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    public class ServiceDiscover {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscover.class);
    private CountDownLatch latch = new CountDownLatch(1);
    private volatile List<String> dataList = new ArrayList<>();
    private String registryAddress;

    public ServiceDiscover(String registryAddress) {
    this.registryAddress = registryAddress;
    ZooKeeper zk = connectServer();
    if (zk != null) {
    watchNode(zk);
    }
    }

    public String discover() {
    String data = null;
    int size = dataList.size();
    if (size > 0) {
    if (size == 1) {
    data = dataList.get(0);
    LOGGER.debug("using only data: {}", data);
    } else {
    data = dataList.get(ThreadLocalRandom.current().nextInt(size));
    LOGGER.debug("using random data: {}", data);
    }
    }
    return data;
    }

    //查看所有结点
    private void watchNode(final ZooKeeper zk) {
    // TODO Auto-generated method stub
    try {
    List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {

    @Override
    public void process(WatchedEvent event) {
    // TODO Auto-generated method stub
    if (event.getType() == Event.EventType.NodeChildrenChanged) {
    watchNode(zk);
    }
    }
    });
    List<String> dataList = new ArrayList<>();
    for (String node : nodeList ) {
    byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
    dataList.add(new String(bytes));
    }
    this.dataList = dataList;
    LOGGER.debug("node data: {}", dataList);
    } catch (KeeperException | InterruptedException e) {
    LOGGER.error("", e);
    }
    }

    //连接到zookeeper
    private ZooKeeper connectServer() {
    // TODO Auto-generated method stub
    ZooKeeper zk = null;
    try {
    zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    latch.countDown();
    }
    }
    });
    latch.await();
    } catch (IOException | InterruptedException e) {
    LOGGER.error("", e);
    }
    return zk;
    }
    }

    第八步:发送RPC请求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class App {
    @SuppressWarnings("resource")
    public static void main( String[] args ) {
    ApplicationContext context = new ClassPathXmlApplicationContext("spring-client.xml");

    RpcProxy rpcProxy = context.getBean(RpcProxy.class);

    IHelloService helloService = rpcProxy.create(IHelloService.class);

    String result = helloService.hello("world");

    System.out.println(result);

    System.exit(0);
    }
    }

3. 总结

参照网络上的一篇博客,自己照猫画虎搭建了一个轻量级RPC框架,了解学习了一些常听说的工具(例如Netty、Zookeeper)等在RPC框架中的应用场景。

搭建该框架的时候,使用Spring作为依赖注入框架,Netty实现NIO方式的数据传输、使用了高效的Protostuff对象序列化工具以及使用了Zookeeper作为分布式环境下的服务发现和服务注册。

具体来梳理下整个框架的运行逻辑。

  1. 启动RpcServer

    1.1 通过运行RpcBootstap类,加载了服务器端的spring.xml并启动了IoC容器,随后将xml文件中声明的Bean都实例化;

    1.2 扫描带有RpcService注解的类,建立服务接口—服务实现类的映射

    1.3 执行RpcServer的初始化操作,包括连接到Zookeeper服务进行服务注册,启动Netty NIO的事件轮询线程池和工作线程池等

  2. 客户端App类的运行

    2.1 加载客户端配置的spring.xml并启动IoC容器将声明的Bean实例化

    2.2 调用RpcProxy的create方法获取服务接口IHelloService的代理类

    2.3 执行代理类的hello方法,代理类将请求封装成RpcRequest类,然后连接到Zookeeper服务进行服务发现获取主机地址和端口号,然后调用RpcClient发送请求,获取结果。

    2.4 RpcClient配置并启动一个Netty EventLoopGroup,将封装的RpcRuest请求发送出去;

    2.5 RpcRequest经由RpcEncoder序列化为二进制数据,然后经由TCP协议发送出去;

    2.6 客户端收到来自服务端的二进制数据响应,然后通过RpcDecoder反序列化为RpcResponse类

  3. 服务器端接受到该RpcRequest的二进制数据,经由RpcDecoder反序列为RpcRequest类,然后通过服务接口—服务实现类映射表查找到对应的实现类Bean,再通过反射调用对应的方法,获取到结果;然后封装成RpcResponse类,随即被RpcEncoder序列化为二进制数据,并交由netty通过TCP协议发送到客户端;

详细代码参见:https://github.com/Spground/RpcDemo

4. Reference

https://my.oschina.net/huangyong/blog/361751?p=2&temp=1519544796844#blog-comments-list

1. 并发控制和锁

在多处理器的时代,程序设计中经常采用多线程以充分利用处理器的性能。在多线程环境下,由于存在共享变量、共享资源等情况,因此有时候需要对多线程的并发访问进行控制。

同很多并发控制的问题类似(例如数据库的并发控制),程序中的并发控制也会使用到例如加悲观锁、乐观锁、多版本视图等技术来完成并发控制(或者称为多线程同步)。因此谈到并发控制,基本上会涉及到锁的概念,而涉及到锁的问题也基本是属于并发控制问题的范畴。

2. Java中的锁

Java中涉及到很多锁的概念,而涉及到的使用层次也不同,因此这里做一个简单的总结。

  1. 内置锁/隐式锁

    Java的每一个对象都有一个monitor,且这个monitor每一次仅能被一个线程所拥有,这就是内置锁或者叫隐式锁。内置锁的获取、释放通常是如下的范式写的:

    1
    2
    3
    synchronized(obj) {
    //当线程获取到obj的内置锁--monitor时,线程会进入到此代码块
    }

    释放内置锁:

    1
    obj.wait();//当前线程放弃obj对象上的内置锁

    或者退出synchronized代码块,也会自动释放获取的内置锁。

  2. 显式锁

    顾名思义,显式锁是显式定义的锁。例如并发工具包的Lock接口下的一些实现类。

    内置锁在Javasynchronized关键字的配合下使用起来十分的简单,但是简答的预定义的东西往往缺乏灵活性,因此为了补充内置锁,显示锁提供了一些额外的特性例如:可轮询可超时可中断锁等。这些特性在实际的编程中提供着很大的灵活性。

    Lock类的实现类常见的主要是ReentrantLock类。

    该类提供了几个重要的方法:

    • lock() 语义同synchronized
    • tryLock() 提供了可超时的特性,在某些情况下可以通过该特性避免死锁的发生
    • lockInterruptibly() throws InterruptedException 在获取锁失败被阻塞的时候可被中断,而采用synchronized获取内置锁的时候,无法被中断

  3. 可重入锁(Reentrant Lock)

    可重入锁指的是已经获取了某个锁的线程去尝试再一次该锁的时候,是可以直接获取到的,而不会阻塞。

    可重入锁避免了如下的死锁情况的产生:

    1
    2
    3
    4
    5
    6
    synchronized void get() {
    set();
    }
    synchronized void set() {

    }

    如果锁不可重入,那么当线程A获取到了“保护”get方法的锁时,那么再进入set方法的时候,会无限期阻塞。而此时,除了线程A,没有任何线程拥有该锁,因此线程A相相等于握着锁去等锁,首尾相连形成死锁了。

  4. 读写锁(Read Write Lock)

    通常的锁都为互斥锁,大多数被共享的变量都是由这种互斥锁保护。一个时刻只能有一个线程在访问该变量。这个在该变量读多写少的情况下显然效率不高。因为读读不需要并发控制,而读写、 写写才需要并发控制。那么显然应该同数据库的并发控制加锁的策略一样,应该提供两种锁,一个是共享锁(读锁)、另一个是互斥锁(写锁),当读取变量的时候,主需要获取共享锁,而写变量的时候才去获取互斥锁。

    Java并发包中提供了常用的ReentrantReadWriteLock锁,该锁提供了读锁、写锁、以及锁降级等特性。

    • readLock() 返回该读写锁对应的读锁
    • writeLock() 返回该读写锁对应的写锁

    当线程获取写锁的时候,如果该读写锁的读锁、写锁被其他线程占有,则该线程获取锁失败;

    当线程获取读锁的时候,如果没有线程持有写锁,则获取读锁成功;否则,获取读锁失败;

    读写所允许锁降级:当一个线程持有写锁的时候,可以直接降级为读锁,而不支持锁升级,因为锁升级会可能会引发死锁(当两个持有读锁的线程,同时进行锁升级,那么这两个线程都不会释放自己的读锁,从而发生死锁)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private ReadWriteLock lock = new ReentrantReadWriteLock(true);
    private Lock r = lock.readLock(), w = lock.writeLock();
    ...
    w.lock();
    try {
    sb.append(append); //降级为read lock
    r.lock();
    } finally {
    w.unlock();//still hold read lock
    }
    try {
    ...
    } finally {
    r.unlock();
    }
    ...

  5. 偏向锁(Biased Lock)

    偏向锁是JDK1.6引入的一项锁优化,指的是偏向锁会偏向第一个获得它的线程,如果在接下来的执行过程中,该锁没有被其他的线程获取,则持有偏向锁的线程将永远不需要同步。在某些情况下,锁不存在多线程竞争的情况,而总是由同一线程在获取、释放、获取、释放。因此,引入了偏向锁,让此种情况下的锁获取的代价变小,偏向锁可以提高带有同步但无竞争的程序性能。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class BiasLockDemo {

    public static void main(String[] args) {
    // TODO Auto-generated method stub
    long t = System.currentTimeMillis();
    List<Integer> list = new Vector<>();//选择Vector是由于其add方法是synchronized修饰的;
    for (int i = 0; i < 1000_0000; i++) {
    list.add(i);
    }
    System.out.println("cost: " + (System.currentTimeMillis() - t) + "ms");
    }
    }

    -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0开启偏向锁后,运行时间:

    1
    cost: 340ms

    -XX:-UseBiasedLocking禁用偏向锁后,运行时间:

    1
    cost: 519ms
  6. 公平锁/非公平锁

    公平锁是指多个线程在等待同一个锁时,必须按照申请锁的先后顺序来获得锁。

    非公平锁是指多个线程在等待同一个锁时,是按按照不确定的顺序来选择某一个线程获取锁。

    通常来讲,公平锁的性能低于非公平锁,但是公平锁可以解决线程饥饿的问题

    Java中可以使用new ReentrantLock(true)构造得到公平锁,而synchronized则提供的内置锁是非公平的。

    ps: Java中提供的显式锁一般都提供Fair和Non-Fair模式,但是即便是公平模式也会提供一些允许插队(barging)的方法允许线程先于等待在前面的线程得到锁。

  7. 悲观锁/乐观锁

    悲观锁:主要的并发控制策略之一,假设冲突总是发生,如果不采取同步措施,例如对共享的变量或者资源加锁,那么肯定会出现问题,类似于事前预防。因此无论共享的数据无论是是否出现竞争冲突,都会对它进行正确的同步。

    乐观锁:和悲观锁不一样,乐观并发控制策略先进行操作,如果操作的数据没出现竞争,那么操作成功;如果操作的数据出现竞争,那么再进行一些后续的弥补操作(常见的就是不断的重试、或者重试数次返回失败信息),类似事后弥补,实现乐观并发控制策略有多种常见的方式:

    • CAS
    • 时间戳
    • 版本号

    存在即合理,悲观锁和乐观锁都有其应用的场景,当数据争用、冲突发生频繁的场景,悲观锁较适合;而数据争用、冲突不频繁的场景,乐观锁则更适合。

  8. 自旋锁(Spinning Lock)

    互斥同步的时候,当线程获取锁失败的时候,通常会进入阻塞状态,java线程和操作系统线程是一一对应的,挂起和恢复线程操作需要由用户态转入核心态完成,这些操作耗时、耗资源。但是某些情况下,某一个线程只会将锁独占很短时间,或者是说很快 便完成了同步代码块的执行,因此其它线程为了这点时间选择将自己挂起、恢复十分没有必要。因此,特别是在多处理环境下,可以让后面请求独占锁失败的线程,进行自旋(忙循环)一会儿,而不是阻塞挂起线程。

    自旋锁的引入是为了解决锁被独占的时间很短的情况下,避免线程被挂起-恢复带来的overhead,因此当锁独占的时间本来就很长的,这种锁便没有存在的意义了。

    JVM中可以通过参数:

    -XX:+UseSpinning开启自旋锁功能;JDK1.6默认是开启的。

    -XX:PreBlockSpin配置每次自旋的次数,默认是10次;

3. 死锁和活锁

3.1 死锁

并发中问题中的死锁最经典莫过于哲学家就餐问题,死锁常常发生在系统高负载环境下,多线程竞争某一共享数据的情况下。当线程A持有锁L的时候同时,线程B持有锁M并尝试获得L,那么这两个线程将永远等待下去。这种情况就是最简单的死锁形式,多个线程由于存在环路的依赖关系而永远的等待下去。

死锁发生最常见的的根本原因就是:多个线程存在环路的依赖关系

比如A等待B,B等待C,C等待D, …, Z等待A,则A间接的等待A,形成环路,发生死锁。

环路的产生具体有如下几种情况:

  • 锁顺序死锁:加锁的顺序不一致导致的死锁;
  • 动态的锁顺序死锁:方法内部加锁顺序是一致的,但是由于锁被参数化了,因此调用该方法时,锁的顺序取决于方法调用者传来的参数,因此也会动态的产生锁顺序死锁。
  • 协作对象之间发生的死锁
  • 资源死锁 例如:线程A持有数据库连接D1并等待D2,而线程B持有数据库连接D2,等待D1则A、B之间出现死锁

解决死锁问题通常有两个角度来解决,死锁避免和死锁解除,一个属于事前预防,另一个是事后弥补;

数据库系统中,为避免死锁,有一个著名的两阶段加锁协议,同时,事务管理器可以通过环路判断死锁的存在,并取消一个代价小的事务以达到死锁的解除。

Java没有数据库事务管理器那么强大,Java中也有一些方法可以避免死锁,但是当死锁发生的时候,除了重启应用别无他法。

Java中的死锁避免:

  • 加锁顺序保持相同(synchronized提供的内置锁只能通过此种方式来避免死锁的发生)
  • 采用可轮询的、可超时的锁(显式锁Lock提供tryLock(long timeout)轮询和超时的特性,因此不会无限的等待下去,当超时的时候,程序可以简单的重试,或者放弃获取该锁,释放已有的锁。同时这种方式通过引入随机因素也可以有限的解决活锁的问题)

3.2 活锁

死锁是形成死锁的线程全部处于无限等待状态,而活锁则是线程不断的重复执行相同的操作,而且总是失败。就相当于线程在执行一个循环的操作序列,周而复始,无穷无尽,导致系统的状态整体停滞不前。

最形象的例子便是:

两个过于礼貌的人甲乙,相向走在一个狭窄的巷子里面,甲和乙同时让对方先走,然后甲乙同时准备接受对方的谦让自己先走,然后两人又同时让对方先走…,如此循环往复,两人没有等待,始终处于活动状态,但是两人始终都无法通过巷子。

同样的类似活锁的例子就是,以太网的共享介质传输信息时,也会出现活锁的问题,以太网技术采用了一种叫做载波多路访问-冲突检测(CSMA-CD)的技术,该技术引入了一些随机因素来避免活锁。

同样的,解决活锁的问题,可以在重试机制中以引入随机性,这样可以有效的避免活锁问题。

4.reference

[1]. Java并发编程实践

[2]. 深入理解JVM虚拟机

[3]. http://www.importnew.com/19472.html

[4]. https://www.cnblogs.com/qifengshi/p/6831055.html

1. 二叉树遍历

树是最重要的数据结构之一,而树的遍历是树最基本的操作。

二叉树的遍历一般来说有三种遍历次序:

  • 前序遍历
  • 中序遍历
  • 后序遍历

而这三种遍历次序都可以采用递归非递归的方式来完成。

就时间、空间的复杂度来讲,因为非递归需要借助额外的Stack来完成操作,所以递归和非递归的时间复杂度都是O(n)O(logn)

那么有没有另外的不同的二叉树遍历方法,在时间或空间能做到更优的呢?答案是:Morris 遍历

由于在遍历的时候,我们需要记住某种遍历次序的的后驱或者前驱结点,常见的递归和非递归都是采用的方式完成这个过程,有没有内部空间来记录这些后驱或者前驱结点呢?有,那就是叶结点的左,右孩子结点,因为叶结点的两个孩子结点都是空指针,如果利用好这些空间,我们就可以在O(1) 的空间完成遍历。

利用叶结点的左、右孩子指向遍历的前驱或者后驱结点,这些指针叫做线索,对应的二叉树叫做线索二叉树

Morris遍历是使用线索二叉树进行中序遍历的一种实现,其可以在O(n)的时间,O(1)的空间完成遍历, 对其稍加修改可以推广到先序、后序遍历,其遍历过程包含三个部分:

  1. 创建指向中序后驱结点的线索;
  2. 遍历输出结点;
  3. 删除线索,恢复树的结构;

2. Morris 中序遍历

Morris 中序遍历过程如下:

  1. 当前结点的左孩子是否为空,若是则输出当前结点,更当前结点为当前结点的右孩子;否则进入2;

  2. 在当前结点的左子树中寻找中序遍历下的前驱结点(左子树中最右结点)

    a. 若前驱结点的右孩子为空,则将前驱结点的右孩子指向当前结点,当前结点更新为当前结点的左孩子;进入3;

    b. 若前驱结点的右孩子为当前结点(不为空),将前驱结点的右孩子置NULL,输出当前结点,当前结点更新为当前结点的右孩子,进入3;

  3. 若当前结点不为空,进入1;否则程序结束;

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cur = root
repeat until cur != NULL:
if cur.left != NULL:
pre = cur.left;
while pre.right == NULL && pre.right != cur://找到前驱结点pre
pre = pre.right
if pre.right == NULL:
pre.right = cur
cur = cur.left
else:
print(cur)
pre.right = NULL
cur = cur.right
else:
print(cur)
cur = cur.right

下图为每一步迭代的结果(从左至右,从上到下),cur代表当前节点,深色节点表示该节点已输出。

以下为Java的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void inOrder(TreeNode root) {
TreeNode cur = root, pre = null;
for (; cur != null;) {
if (cur.left != null) {
pre = cur.left;
// find predecessor
while (pre.right != null && pre.right != cur)
pre = pre.right;
if (pre.right == null) {// create thread
pre.right = cur;
cur = cur.left;
} else {
print(cur);
pre.right = null;
cur = cur.right;
}
} else {
print(cur);
cur = cur.right;
}
}
}

3. Morris 前序遍历

对于前序遍历,只需要在中序遍历的基础上稍加修改便可以完成。

Morris 前序遍历的流程如下:

  1. 当前结点的左孩子是否为空,若是则输出当前结点,并更新当前结点为当前结点的右孩子;否则进入2;

  2. 在当前结点的左子树中寻找中序遍历下的前驱结点(左子树中最右结点)

    a. 若前驱结点的右孩子为空,则将前驱结点的右孩子指向当前结点,输出当前结点(在这里输出,和中序遍历不同的地方),当前结点更新为当前结点的左孩子;进入3;

    b. 若前驱结点的右孩子为当前结点(不为空),将前驱结点的右孩子置NULL,当前结点更新为当前结点的右孩子,进入3;

  3. 若当前结点不为空,进入1;否则程序结束;

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cur = root;
repeat until cur != NULL:
if cur.left != NULL:
pre = cur.left;
while pre.right == NULL && pre.right != cur://找到前驱结点pre
pre = pre.right
if pre.right == NULL:
pre.right = cur
print(cur)//此处和中序遍历不同
cur = cur.left
else:
pre.right = NULL
cur = cur.right
else:
print(cur)
cur = cur.right

下图为每一步迭代的结果(从左至右,从上到下),cur代表当前节点,深色节点表示该节点已输出。

以下为Java的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void preOrder(TreeNode root) {
TreeNode cur = root, pre = null;
for (;cur != null;) {
if (cur.left != null) {
pre = cur.left;
// find predecessor
while (pre.right != null && pre.right != cur)
pre = pre.right;
if (pre.right == null) {// create thread
print(cur);// print node here
pre.right = cur;
cur = cur.left;
} else {
pre.right = null;//delete thread
cur = cur.right;
}
} else {
print(cur);
cur = cur.right;
}
}
}

4. Morris 后序遍历

后序遍历的流程如下:

  1. 新建一个Dummy结点,该结点的左孩子指向树根root,将Dummy作为当前结点;

  2. 当前结点的左孩子是否为空,更新当前结点为当前结点的右孩子;否则进入2;

  3. 在当前结点的左子树中寻找中序遍历下的前驱结点(左子树中最右结点):

    a. 若前驱结点的右孩子为空,则将前驱结点的右孩子指向当前结点,当前结点更新为当前结点的左孩子,进入3;

    b. 若前驱结点的右孩子为当前结点(不为空),反转当前结点到前驱结点之间的路径,输出该路径所有结点;反转当前结点到前驱结点之间的路径,恢复原状。将前驱结点的右孩子置NULL,当前结点更新为当前结点的右孩子,进入3;

  4. 若当前结点不为空,进入1;否则程序结束;

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
dummy = Node(-1)
dummy.left = root
cur = dummy
repeat until cur != NULL:
if cur.left != NULL:
pre = cur.left;
while pre.right == NULL && pre.right != cur://找到前驱结点pre
pre = pre.right
if pre.right == NULL:
pre.right = cur
cur = cur.left
else:
reverse(cur.left, pre)
print(pre, cur.left)
reverse(pre, cur.left)//再次反转,恢复原状
pre.right = NULL
cur = cur.right
else:
cur = cur.right

下图为每一步迭代的结果(从左至右,从上到下),cur代表当前节点,深色节点表示该节点已输出。

以下为Java的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public static void postOrder(TreeNode root) {
// TODO Auto-generated method stub
TreeNode dummy = new TreeNode(-1);
dummy.left = root;
TreeNode cur = dummy, pre = null;
for (;cur != null;) {
if (cur.left != null) {
pre = cur.left;
// find predecessor
while (pre.right != null && pre.right != cur)
pre = pre.right;
if (pre.right == null) {// create thread
pre.right = cur;
cur = cur.left;
} else {//print here
reverse(cur.left, pre);
print(pre, cur.left);
reverse(pre, cur.left);
pre.right = null;
cur = cur.right;
}

} else {
cur = cur.right;
}
}
}

private static void print(TreeNode from, TreeNode to) {
// TODO Auto-generated method stub
for (;;from = from.right) {
print(from);
if (from == to) break;
}
}

private static void reverse(TreeNode from, TreeNode to) {
// TODO Auto-generated method stub
if (from == to) return;
TreeNode x = from, y = from.right, z= null;
x.right = null;
for (;;) {
z = y.right;
y.right = x;
x = y;
if (y == to) break;
y = z;
}
}

完整的代码:详见

https://github.com/Spground/archive/blob/master/misc/code/MorrisTraversalDemo.java

5. References

  1. http://www.cnblogs.com/AnnieKim/archive/2013/06/15/MorrisTraversal.html
  2. https://en.wikipedia.org/wiki/Tree_traversal#Morris_in-order_traversal_using_threading
  3. https://en.wikipedia.org/wiki/Threaded_binary_tree#The_array_of_Inorder_traversal

1. Java线程与线程池

1.1 线程

相比于“繁重”的进程,线程可以算是一种轻量级的进程,大多数操作系统都是支持以线程作为调度执行单元以提高系统的并发性。毫不例外,Java也支持多线程。

多个线程交替甚至并行的执行,特别在多处理器时代,可以极大的提高资源的利用率。通常我们使用多线程来并发的执行多个具有明显边界的任务,例如Web服务器使用多线程来同时处理来自多个用户的请求,每一个请求便是一个独立的、有边界的任务。

Java中新建一个线程通常有两种方式:

  • new Thread() 方式
1
2
3
4
5
6
Thread t = new Thread() {
@Override
public void run() {
//这里是将要被执行的代码
};
};
  • new Thread(new Runnable())方式
1
2
3
4
5
6
7
Runnable task = new Runnable() {	
@Override
public void run() {
//这里是将要被执行的代码
}
};
Thread t = new Thread(task);

第一种方式存在一个很大的缺点:被执行的代码(任务)和执行机制(如何执行这些任务)耦合了

如果将任务执行机制分别抽象出来,那么久会获得更好的灵活性。第二种通过Runnable方式创建线程的方式,正好克服了这个缺点,Runnable抽象了任务,而Thread抽象了执行机制,一个线程可以通过维护一个工作队列,采取不同的执行策略来执行多个不同的的任务。

因此,第二种创建线程的方式是使用得最多、也更推荐的方式,后面讨论的Executor等内容都会看见这种创建线程的方式。

线程创建完毕后,调用start()方法便可以启动该线程。

1
t.start()

Java线程大致分为两类:

  1. 守护线程 通过 方法setDaemon(true)将某个线程设置为守护线程
  2. 非守护线程 默认创建的是非守护线程

JVM启动时候,会创建一个非守护线程执行main方法,这个线程有时候也被称为主线程,然后程序可以自主的创建守护线程和非守护线程。JVM一直执行所有被创建的线程,当调用System.exit()或者JVM中所有的非守护线程都死了(正常执行run方法完毕或者未处理异常层层上抛最后抛到了JVM层),此时JVM停止。

到这里我们都在使用一个名为Thread的类,现在有这么几个问题?

  • Thread类都有哪些常用的方法?
  • Java中的线程和OS线程是一一对应的吗?亦或是多对一?甚至是多对多?

常用的方法用:

  1. start() :启动一个该线程
  2. sleep(long time): 让该线程让出CPU进入睡眠,直到指定的time后,恢复然后进入可运行状态。
  3. yield(): 主动让出CPU
  4. interrupt() : 向该线程发出一个中断请求
  5. isInterrupt(): 返回该线程是否存在中断请求
  6. interrupted(): 返回该线程是否存在中断请求,并清空线程的中断状态。

对于Java的线程和OS线程的对应关系,通过查看源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public synchronized void start() {
....
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}

调用了start0()方法,是一个native方法:查看OpenJDK\src\share\native\java\lang\Thread.c源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

start0对应于JVM_StartThread函数,该函数是JVM提供的一个API函数,查找HotSpot下的\src\share\vm\prims\jvm.cpps,找到对应的方法:发现该出实际上new JavaThreadJavaThreadHotSpot定义的C++类。

1
2
3
4
5
6
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
....
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
Thread::start(native_thread);
JVM_END

\src\share\vm\runtime\thread.cpp找到JavaThread的定义:其调用了os::create_thread()函数

1
2
3
4
5
6
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
....
os::create_thread(this, thr_type, stack_sz);
...
}

其中OS::create_thread函数的实现是依赖于不同操作系统,这里选择linux

1
2
3
4
5
6
bool os::create_thread(Thread* thread, ThreadType thr_type, size_t stack_size) {
....
int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
....
return true;
}

到此看出了实际上是pthread_create为我们创建了一个线程,而Java线程OS线程是一对一的关系。

到这里,对于线程简单的总结算是完成了,我们知道当我们需要并发处理多个任务时,我们可以简单的创建一个新的线程去处理这些任务,但是这样的方式在高负载的情况下确是不明智的。

1.2 线程池

针对每一个需要被执行的任务都创建一个线程来执行的方式虽然很直观,但是存在如下缺点:

  • 线程创建与销毁代价非常高,当任务到达的速度特别快时,为每个任务创建一个线程会快速消耗掉系统资源。
  • 当线程的数量大于CPU的数量的时候,当CPU正在100%工作的时候,这时为每一个到来的任务创建一个线程只会耗费额外的内存,而不会提高资源利用率

由于不能无限制的创建线程,因此将一定数量的线程放在一个池中形成一个线程池,同时维护一个任务队列,让这些线程不停的去执行任务,是一个有效解决上述缺点的好办法。

线程池主要用来解决线程生命周期开销问题资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在新任务到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即执行新的任务,使响应更快。另外,通过适当的动态的调整线程中的线程数目可以防止出现资源不足的情况。

使用线程池提交任务这种模型和生产者-消费者模式是何其的相似。

任务提交者便是生产者

线程池维护的工作队列便是缓冲区

线程池维护的那些线程便是消费者

对于一个线程池来讲,它应该需要关注的主要是:

  • 创建多少个线程?
  • 工作队列中的任务数量是否有上边界?
  • 当工作队列的任务数量少于线程数量的时候,需要回收线程吗?
  • 这么多线程并发的访问工作队列等共享资源时,如何正确的进行同步而又不降低效率?
  • 如何让任务提交者对已经提交的任务进行操作(获取结果、取消任务)?
  • 如何优雅的关闭线程池?
  • ……

上述每一个关注点无疑都是需要精心设计、反复推敲的,如果from scratch构建一个线程池,对实现者的要求是极高的。因此Java为广大用户提高了一个线程池框架,通过该框架我们可以较容易的完成一些常见的多线程任务。

2. Executor框架

Java提供的线程池框架主要涉及到如下的类和接口:

  • Executors: 通过很多静态方法,提供不同的预配置的线程池;
  • Executor: Executor最上层的接口,只包含execute(Runnable command)方法;
  • ExecutorService: Executor的子接口;包含很多有用的方法,例如submmit()、shutDown()、shutDownNow()、awaitTermination()、invokeAll();
  • ThreadPoolExecutor: 具体的线程池的实现类;

2.1 Executors

工具类Executors主要提供以下几种预配置的线程池:

  1. newFixedThreadPool(int nThreads) 创建一个包含nThreads个线程的线程池,共享一个无边界的工作队列,在任何时刻,线程池最多有nThreads个存活线程;当某个线程由于执行过程中出现错误而死亡,则新建一个线程以补充。

    例如:创建一个包含5个线程的线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Executor exe = Executors.newFixedThreadPool(5);
    exe.execute(new Runnable() {

    @Override
    public void run() {
    // TODO Auto-generated method stub

    }
    });

  2. newSingleThreadExecutor()创建仅仅包含一个线程的线程池,维护着一个无边界的工作队列,在任何时刻,线程池只能有一个任务被执行,任务被保证顺序的执行。如果线程由于执行过程中出现错误而死亡,则新建一个线程代替继续执行任务;和newFixedThreadPool(1)不一样,一旦线程池创建,该线程池不能再进行配置。这是通过将ThreadPoolExecutor包装成包装类实现的,因此该方法返回的Executor不能强制转化为ThreadPoolExecutor;

    1
    2
    3
    4
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService //包装类
    (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }
    1
    ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newSingleThreadExecutor(); //强转将失败

  3. newCachedThreadPool()创建一个线程池,当新任务被提交,如果池中没有空余的存活线程,则该线程池会创建新的线程;如果有存活的多余的线程则会复用该线程;当某个线程空闲超过60秒的时候,该线程会被终止然后被移除线程池。这种线程池适合被用于处理大量耗时短的任务,因为设置了一个空闲超时时间,这样当整个线程池都闲下来时,基本不会暂用额外的资源。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Executor exe = Executors.newCachedThreadPool();
    exe.execute(new Runnable() {

    @Override
    public void run() {
    // TODO Auto-generated method stub

    }
    });

  4. newSingleThreadScheduledExecutornewThreadScheduledExecutor(int coreThreadSize)分别是创建包含一个线程和指定数量的线程,该线程池的线程定时的执行一些任务;

    以上是Executors包含的一些常用的静态方法,它为我们预配置一些常用的线程池,但是在某些时候,这些预配置的满足不了需求,JDK也通过ThreadPoolExecutor重载的构造方法让使用者根据自己的需求进行线程池配置。

2.2 ThreadPoolExecutor

上述提到的Executors中的静态方法返回的预配置线程池,也是通过调用不同ThreadPoolExecutor的构造方法完成的。

具体来讲,ThreadPoolExecutor构造方法有几个:

  • 1
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
  • 1
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
  • 1
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
  • 1
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

综合来看就是如下几个配置参数:

  • corePoolSize

    指定常驻线程池中的线程的数量;当由于某些原因,线程池中的线程数量小于corePoolSize时,线程池负责创建新的线程来补充,直到数量达到corePoolSize。

  • maximumPoolSize

    指定线程池中最多的线程数量,maximumPoolSize >= corePoolSize恒成立。对于newFixedThreadPoolnewSingleThreadPool的配置为maximumPoolSize = corePoolSize;而对于newCachedThreadPool中的配置为maximumPoolSize = Integer.MAX_VALUE > corePoolSize = 0

  • keepAliveTime/unit

    当线程池中线程数量大于corePoolSize的时候,对于的线程的空闲时间超过keepAliveTime后,会被终止,从线程池中移除。在newCachedThreadPool中,这个值默认是60秒。unit为时间单位。

  • workQueue

    线程池中用于维护那些已经提交但还未被执行的任务;

  • threadFactory

    负责创建线程的线程工厂;

  • handler

    当工作队列里面待处理的任务已经达到工作队列的最大容量时或者线程池已经(或正在)关闭,其他任务提交者提交的任务将会被拒绝执行;线程池会回调这个handler来通知这次拒绝。

示例:

创建一个常驻线程数量为2的,最大线程数量为5的,多余线程空闲超时为30秒的, 无界工作队列的线程池:

1
Executor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

3. Future、Callable

现在有了配置好的线程池,接下来便是向线程池提交我们的任务。

通常来讲,任务按有无返回值可以分为两类:

  • 无返回值:这类任务不关心返回值,例如复制一个文件;
  • 有返回值:这类任务关系返回值,例如请求一个网络资源;

JDK提供两种相似的接口来达到上述目的:

  • Runnable : 包含void run()方法,无返回值;也不会抛出受检查的异常;
  • Callable : 包含V call(), 有返回值,会抛出受检查的异常;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

无论是Runnable还是Callable,任务提交者希望可以在向线程池提交任务后,获得一个关于该任务的Handle,以便任务提交者对任务进行控制(取消任务、查看任务是否完成、获取任务执行结果)这种类似于Handle的便是Future

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

示例:

当提交一个获取网页的任务后,主线程通过Future获取任务执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
ExecutorService e = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), rEH);
Callable<String> call = new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
//模拟网络请求
Thread.sleep(500);
return "TEST";
}
};
Future<String> f = e.submit(call);
String res = f.get();//阻塞直到结果返回
System.out.println(res);

4. References

  1. http://www.baeldung.com/thread-pool-java-and-guava
  2. Java Conccurency in Practice
  3. http://www.blogjava.net/stevenjohn/archive/2011/12/12/366161.html

1.概述

大量的结构化数据通常被存储在数据库中,通过简单的SQL语句,我们可以方便的查询、修改、插入、删除这些数据。例如当我们执行:

1
SELECT * FROM USER WHERE userName = 'A';

DBMS会将表中所有用户名等于A的用户的记录返回给我们,但随着表中的记录越来越多,比如说从10000增加到1000万,这时候我们往往会发现查询的速度会变慢很多。这时候,通常的做法就是在userName列上面创建索引,以加快数据的查询。

那么什么是数据库索引为什么数据库索引可以加快查询以及如何正确的使用数据库索引呢?以下将从这个几个方面做一下简单的总结。

2. 数据库索引

2.1 改善数据库查询

2.1.1 用树形数据结构进行查找

通常来讲,我们在对数据库中数据最频繁的操作便是查询,比如我们给定一定的查询条件将某个表的数据记录查询出来,因此这实际就是一个查找问题

查找问题最简单直接的就是顺序查找,其时间复杂度为O(N)

例如第1节中的:

1
SELECT * FROM USER WHERE userName = 'A';
  • 假设userName列中不存在重复值,很显然我们在查找这条记录的时候需要对数据表中数据记录一行一行的扫描,然后直到找到我们需要的那条记录,这种方式的平均时间复杂度是O(N/2)
  • 假设userName列中存在重复值,这时情况更加糟糕,我们需要进行所谓的全表扫描找到所有的记录,这种方式的时间复杂度是O(N)

即便是在内存中对大量数据进行O(N)的查询我们也很难接受,更何况是这些数据是存在外部存储设备上(常见便是磁盘)。磁盘的一次访问时间大约是8ms,而内存的一次访问约50ns

因此当数据量特别大的时候,数据库用顺序查找的方式进行数据查找几乎是不可接受的。

在众多的用于查找的数据结构中,平衡的树形结构是十分合适被用来进行查找,其查找的时间复杂度可以降到O(logN),当然某些数据库索引便是采用树形结构来组织的。

到现在,我们总结了为什么要使用数据库索引的原因,并且我们总结了数据库索引可以使用树形数据结构来加快数据的查询。下面给出维基百科的定义:

数据库索引是一种花费额外存储空间以加快数据库数据检索的数据结构。

我们知道我们可以采用平衡二叉排序树来组织数据库索引,每个节点存储关键字和其对应的数据(或者数据所在的磁盘地址)。这样在进行查询的时候,我们可以将时间复杂度降低到我们需要的O(logN)。但是实际上数据库索引并不是采用这种二叉树的数据结构,而是采用B-Tree来实现数据库索引以加快数据查询。关于为什么不使用二叉搜索树的原因以及B-Tree是什么,2.1.2会进行总结。

2.1.2 B-Tree

上图为磁盘的某个盘面的解剖示意图,由于一次寻道操作很慢,为了数据读取的有效性,磁盘是按扇区为单位存储和访问数据的。关于磁盘的工作原理,参见这篇博文

  1. 如果我们用二叉搜索树组织10000个关键字,那么我们的树的高度约为:14。因此如果我们要查询叶节点上的关键字则需要14次磁盘访问,大约耗时:8ms*14=112ms。虽然这个结果对于顺序查找好了不少,但实际上为了降低磁盘访问次数,我们可以进一步将树变得矮一点便可以降低磁盘访问次数,从而提高查询速度。
  2. 为了降低树的高度,我们将二叉树中的每个节点最多俩孩子变为多个孩子,便可以将树的高度大大降低。

而这种平衡的、每个节点多与2个孩子节点的树形数据结构便是B-Tree。上图是一个简单的对比图,从图中我们可以看出B Tree相比二叉树,其每一个结点里面含有多个关键字,从而大大降低了树的高度。当每个结点允许存放100个关键字的时候,10000个关键字的B Tree树高大约为2,因此仅需要2次磁盘访问便可以访问到数据,大约耗时8ms*2=16ms,相比于前面的二叉树来讲,性能又提升了不少。B Tree的结点的大小通常被设置为一个扇区的大小(假如说是4KB),这样一次磁盘访问便可以读取到该节点所有关键字。

简单总结一下:由于数据存储在磁盘上,考虑到磁盘的特性,因此通过将结点的大小增加到一个扇区的大小,从而降低查找树的高度以减少耗时的磁盘访问次数,这就是B Tree适合用来查找基于外部存储数据的主要原因。

到这里我们总结了数据库索引为什么不采用二叉搜索树实现的原因。接下来我们稍微简单的总结下B Tree相关的内容。

来自维基的定义:

在计算机科学中,B树是一棵自平衡树,它允许在logN的时间内完成查找、顺序访问、插入和删除。B树是一种泛化的二叉搜索树,其结点可以拥有超过2个孩子结点。

一棵5阶的B树大概长这样(5阶指的是孩子节点数量最多为5):

B树中的结点通常分为三类

  • 根结点:包含关键字,包含关键字的个数有上界无下界,有孩子结点,包含指向孩子结点的指针
  • 内部结点:包含关键字,且包含关键字的个数有上下界,有孩子结点,包含指向孩子结点的指针
  • 叶子结点:包含关键字,且包含关键字的个数有上下界,没有孩子结点,不包含指向孩子结点的指针

关于B树的性质的描写有很多种,以下是从《算法导论》总结出来的:

B树应该满足的性质:

  1. 每个叶结点具有相同的深度,即树的高度h
  2. 每个结点所包含的关键字个数都有个下界(根节点没有下界)和上界,上下界通过一个被称为B树的最小度数t>=2来表示
    • 除了根节点以外,每个结点至少含有t-1个关键字;因此除了根节点以外的每个内部结点至少含有t个孩子。如果树非空,根节点至少能含有1个关键字以保证树的最小度t>=2
    • 每个结点最多包含2t-1个关键字。因此一个内部结点最多有2t个孩子,当一个结点恰好有2t-1个关键字的时候,称该结点是满的

另外,有的地方存在的概念,指的是B树允许非叶子结点的最大孩子结点个数,关于B Tree的基本操作SearchInsertionDelete等将在下一篇博文介绍,这里不做介绍。

数据库索引中的B树中,每个结点包含如下内容:

  • 存储代表被索引数据库列的关键字
  • 指向孩子结点的指针
  • 指向数据库记录的指针

B Tree虽然能很好的帮助我们改善诸如WHERE userName = 'A'之类的查询,但是B树还存在以下两个缺点:

  1. 但是对于一些常见的范围查询,例如WHERE age > 10等,我们就需要中序遍历B树,这会增加缓存miss率,从而增加我们的磁盘访问次数。
  2. 与此同时,由于我们在结点中不仅仅存储了指向孩子结点的指针,而且还存储了指向数据库记录的指针,因此每个结点的大小就会较大,又由于扇区的大小通常是固定的,因此每个结点包含的关键字的个数就会减少,也就是说B的阶会降低(树会变高)。

因此为避免上述两个缺点,B树的变体B+树便应运而生。

2.1.3 B+树

同B树不同,B+树主要有如下不同:

  • B+树中关键字和指向数据记录的指针仅仅存放在叶子结点
  • B+树中同一关键字可能出现在多个结点中,但是仅仅叶子结点还有指向数据记录的指正;而B树同一关键字仅仅可能出现在一个结点中
  • 由于B+树的关键字和数据记录的信息仅仅保存在叶子结点中,因此无论查找是否成功,都会深入到B+树的叶子结点,因此查询时间基本上是稳定的;而B树的查询时间和关键字在树中的位置有关,因此查询时间不是稳定的

除此之外,B+树将叶结点通过链表的形式连接起来,这样避免了范围查询或者全表扫描的时候,对B树的中序遍历。

下图是B树和B+树的对比图:

2.2 数据库索引的类型

由于不同数据库提供的索引分类不同,因此很多时候索引分类是混乱的。但是常见的数据库索引主要有如下几种:

  1. 聚集索引(Clustered Index):数据表中的记录的物理顺序和索引关键字的逻辑顺序是一致的。由于这个特性,聚集索引能够大大加快类似顺序检索、反序检索和范围查询。由于聚集索引的逻辑顺序和记录的物理顺序是一致的,显然一张数据表聚集索引只能有一个通常建立在主键上
  2. 非聚集索引(Non-Clustered Index):和聚集索引相反,记录的物理顺序和索引关键字的逻辑顺序不是一致的。因此一个数据表可以有0个或者多个非聚集索引。
  3. 唯一索引(Unique Index):唯一索引是不允许其中任何两行具有相同索引值的索引。
  4. 非唯一索引(Non-Unique Index):允许任何两行具有相同索引值的索引,仅仅是用来加快查询。
  5. 主键索引(Primary Index):建立在主键上的索引,由于主键具有唯一性,因此从某种意义上来讲,主键索引也是唯一索引,又因为主键列具有不经常修改等特性,因此主键索引通常也设置为聚集索引。由于主键索引的特殊性,大多数数据库都会默认在主键上自动创建主键索引。

3. 创建数据库索引

虽然索引会加快数据的查询,但是维护索引需要额外的空间和时间。因此需要在合适的列上创建合适的索引的,否则只会盲目的创建索引只会适得其反。

适合创建索引的列具有如下特点:

  • 对于经常处于WHERE、JOIN、OrderBy字句中的列应该创建索引,以加快查询
  • 对于需要保持唯一性的列,需要在创建唯一索引
  • 主键列默认是创建主键索引
  • 聚集索引应该创建在很少修改的列

不适合创建索引的列具有如下特点:

  • 查询中很少使用或者参考的列不应该创建索引;

  • 对于取值很少的列,例如性别列没必要创建索引;

  • 定义为text、image、bit的列不适和创建索引;

  • 频繁修改的列不适合创建索引,维护索引的代价会很大;

因为物理顺序和逻辑顺序是一致的,如果被索引的列频繁更新,为了维护逻辑顺序很物理顺序的一致性,会付出很大的代价,因此聚集索引应该建立在很少和不更新的列上面,

4. References

  1. https://stackoverflow.com/questions/1108/how-does-database-indexing-work
  2. https://stackoverflow.com/questions/107132/what-columns-generally-make-good-indexes/8937872#8937872
  3. https://stackoverflow.com/questions/870218/differences-between-b-trees-and-b-trees
  4. http://blog.csdn.net/yongheng_1999/article/details/53678814
  5. http://blog.csdn.net/kennyrose/article/details/7532032

1.数据库事务

事务是用户定义的一个数据库操作系列,这些操作要么全做要么全不做,是一个不可分割的工作单位。

事务是数据库中重要的概念,通常以BEGIN TRANSACTION开始,以COMMIT或者ROLLBACK结束。数据库事务具有四个著名的特性:ACID

  • Automatic

    原子性:即事务中的操作要么都做,要么都不做。

  • Consistency

    一致性:事务的执行的结果必须是使数据库从一个一致性状态到另一个一致性状态。这里的一致性状态指的是物理和逻辑上的数据库一致性状态。

    比如一个银行系统内部各个用户直接互相转账,无论如何转账,系统内部的金额的总数总是恒定不变的。

  • Isolation

    隔离性:一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对其他并发事务是隔离的,并发执行的各个事务之间不能相互干扰。

  • Duration

    持久性:一个事务一旦提交,它对数据库中数据的改变就该是永久性的。

    事务的ACID特性需要数据库管理系统保证,同时事务也是数据库管理系统恢复并发控制的基本单位。

2. 数据库并发控制

2.1 数据不一致性

第1节提到了事务是并发控制的基本单位,由于并发控制可能会导致事务的ACID特性遭到破坏,从而带来了数据库的数据不一致性。具体来讲,由于并发操作,多个事务的并发执行会带来如下四个典型的数据不一致性问题,而这几种问题主要是由于并发操作破坏了事务的隔离性Isolation)导致的,第3节会介绍MySQL数据库中并发控制机制下不同的事务隔离级别。

  1. 丢失修改

    这种问题是最直观,最不应该发生的,例如两个事务T1T2都读入同一数据进行修改,这样T1提交修改结果后,随即T2也提交了结果。这样,T2便将T1对数据的修改给覆盖了,这样就出现了丢失修改的问题了。

    这种问题发生的最主要原因是:两个或多个事务都在对同一数据项进行写操作

  2. 不可重复读

    不可重复读指的是:事务T1读取了某个数据项A,在事务T1运行期间又有其他事务例如T2对数据项A进行了修改并提交了,这样事务T1再次读取数据项A的时候,发现结果不一致了。

    这种问题发生的最主要原因是:一个事务在对某一个数据项进行读操作,而另外的其他事务对该数据项进行了写操作。

  3. 读脏数据

    读脏数据指的是:事务T1正在修改某个数据项A,事务还未提交,而事务T2读取了数据项A,但是随后事务T1由于某种原因回滚了事务(并未提交事务),此时事务T2读取到的数据项A的值和数据库中的值是不一致的,称事务T2读了脏数据(Read Dirty)。

    这种问题发生的最主要原因是:一个事务在对某一个数据项进行写操作,而另外的其他事务对该数据项进行了读操作。

  4. 幻读

    幻读指的是:事务T1按照某些条件从数据库中查询了一些数据记录,但是在事务T1执行过程中,事务T2插入或者删除了某些数据记录,这样当事务T1再一次按照同样的条件从数据库查询的时候,发现多了或者少了某些记录。幻读本质上属于不可重复读,这里为了和第3节的隔离级别作对照,所以将其单独划出来作为幻读。

    产生的原因同不可重复读类似:由于事务T1按条件对数据表中的很多数据项查询的时候,这时候其他事务T2对数据表进行了其他数据项的写操作(插入、修改或者删除)。

    为解决并发带来的一系列问题,主要有封锁、时间戳和乐观控制法来进行并发机制的设计。而封锁是最常用的数据库管理系统并发机制采用的方法。

2.2 封锁和并发度

封锁是实现并发控制的一个非常重要的技术。所谓封锁就是在事务对某个数据对象例如表、记录等进行操作的时候(读或写),先向系统请求对其加锁,成功加锁后,该事务才可以继续对其进行操作;否则,只能等待直到加锁成功。

具体来说:基本的锁分为两种:

  • 排他锁(Exclusive Locks)又称写锁,X锁;
  • 共享锁(Share Locks)又称读锁,S锁;

采用封锁确实能来保证并发下的数据库事务的特性,但是封锁带来了额外的系统开销,进而影响到系统的并发度。

例如1:当事务T1对某个数据项A进行写操作的时候,为避免读脏数据, 那么所有的其他向对数据项A进行读取的事务都得进行等待,那么系统此刻的并发度就大大降低了。

例如2:为避免幻读的产生,当事务T1对某一表Table中的某几行记录进项查询的时候,由于封锁了整个表Table,此时所有其他想对该表Table进项增加删除记录的事务都得进行等待,由于封锁的数据对象粒度太大,导致并发度大大降低。

综上可以看出:封锁和并发度本来就是一种矛盾,一方面我们想要尽可能的保证事务的隔离性进而避免2.1节中的问题;另一方面我们有希望系统的并发度不要下降。

既然是矛盾,那就只能妥协,为了充分满足不同的应用场景,MySQL数据库InnoDB支持4种不同的事务隔离级别。

第3节将结合两种不同的封锁类型来理解这四种不同的事务隔离级别。

3. 事务隔离级别

考虑到封锁和并发度的权衡,MySQL为用户提供了四种不同的事务隔离性级别。并发度从低到高(数据一致性强度从高到低)分别是:

  • SERIALIZABLE(可串行化)可避免脏读、不可重复读、幻读情况的发生。
  • REPEATABLE READ(可重复读)可避免脏读、不可重复读情况的发生。
  • READ COMMITTED(读已提交)可避免脏读情况发生。
  • READ UNCOMMITTED(读未提交)最低级别,以上情况均无法保证。

通过2.1节我们知道了几种数据不一致产生的主要原因,即写写和读写同时发生在多个事务对同一数据对象的情况下,另外还介绍了两种锁类型:X锁和S锁。

两个事务的X锁和S锁可以构成一个相容矩阵:如下:

T1 / T2 X锁 S锁
X锁 No No
S锁 No Yes

含义就是:T1对数据项加了X锁,则T2不能对其加SX锁,T2对数据项加了S锁,则T1可以对其加S锁而不能加X锁。(相容矩阵的加锁顺序都是是T1先加锁,T2后加锁

  • **READ UNCOMMITTED(读未提交)**最低级别,以上情况均无法保证。

官网给出建议:该隔离级别仅仅能做查询

The isolation level that provides the least amount of protection between transactions. Queries employ a locking strategy that allows them to proceed in situations where they would normally wait for another transaction. However, this extra performance comes at the cost of less reliable results, including data that has been changed by other transactions and not committed yet (known as dirty read). Use this isolation level with great caution, and be aware that the results might not be consistent or reproducible, depending on what other transactions are doing at the same time. Typically, transactions with this isolation level only do queries, not insert, update, or delete operations.

由于其不能保证读脏数据,因此S锁和X是相容的;因此可能如下:

T1 / T2 X锁 S锁
X锁 No(?待考证) Yes
S锁 Yes Yes
  • **READ COMMITTED(读已提交)**可避免脏读情况发生。

读已提交,避免脏读,但是不能保证可重复读;说明T1X锁和T2S锁不相容;而S锁和X锁相容的。也就是保证了某个事务在写数据项的时候,其他事务是不能读写该数据项的。

T1 / T2 X锁 S锁
X锁 No No
S锁 Yes Yes
  • **REPEATABLE READ(可重复读)**可避免脏读、不可重复读情况的发生。

可重复读,避免了脏读和不可重复读;说明T1X锁和T2S锁、T1S锁和T2X锁都不相容。也就是保证了某个事务在写某个数据项的时候,其他事务不能读写该数据项;且在读某个数据项时,其他事务只能读该数据项。

T1 / T2 X锁 S锁
X锁 No No
S锁 No Yes
  • **SERIALIZABLE(可串行化)**可避免脏读、不可重复读、幻读情况的发生。

为了避免幻读、不可重复读、脏读、丢失修改等产生,将并发事务的事务调度策略设置为为可串行化调度,这能保证该事务调度策略的运行结果同某一种串行调度结果一致,但是这会极大的降低并发度;同时,这也是MySQLInnoDB提供的最高的事务隔离级别,这里的相容矩阵同3。

The isolation level that uses the most conservative locking strategy, to prevent any other transactions from inserting or changing data that was read by this transaction, until it is finished. This way, the same query can be run over and over within a transaction, and be certain to retrieve the same set of results each time. Any attempt to change data that was committed by another transaction since the start of the current transaction, cause the current transaction to wait.

This is the default isolation level specified by the SQL standard. In practice, this degree of strictness is rarely needed, so the default isolation level for InnoDB is the next most strict, REPEATABLE READ.

4. References

https://dev.mysql.com/doc/refman/5.7/en/glossary.html#glos_isolation_level

http://www.cnblogs.com/xdp-gacl/p/3984001.html

《数据库系统概论》王珊,萨师煊

1. HashMap工作原理

HashMap作为优秀的Java集合框架中的一个重要的成员,在很多编程场景下为我们所用。HashMap作为数据结构散列表的一种实现,就其工作原理来讲单独列出一篇博客来讲都是不过分的。由于本文主要是简单总结其扩容机制,因此对于HashMap的实现原理仅做简单的概述。

HashMap内部实现是一个桶数组,每个桶中存放着一个单链表的头结点。其中每个结点存储的是一个键值对整体(Entry),HashMap采用拉链法解决哈希冲突(关于哈希冲突后面会介绍)。

由于Java8HashMap的某些地方进行了优化,以下的总结和源码分析都是基于Java7

示意图如下:

HashMap提供两个重要的基本操作,put(K, V)get(K)

  • 当调用put操作时,HashMap计算键值K的哈希值,然后将其对应到HashMap的某一个桶(bucket)上;此时找到以这个桶为头结点的一个单链表,然后顺序遍历该单链表找到某个节点的Entry中的Key等于给定的参数K;若找到,则将其的old V替换为参数指定的V;否则直接在链表尾部插入一个新的Entry节点。
  • 对于get(K)操作类似于put操作,HashMap通过计算键的哈希值,先找到对应的桶,然后遍历桶存放的单链表通过比照Entry的键来找到对应的值。

以上就是HashMap的基本工作原理,但是问题总是比我们看到的要复杂。由于哈希是一种压缩映射,换句话说就是每一个Entry节点无法对应到一个只属于自己的桶,那么必然会存在多个Entry共用一个桶,拉成一条链表的情况,这种情况叫做哈希冲突。当哈希冲突产生严重的情况,某一个桶后面挂着的链表就会特别长,我们知道查找最怕看见的就是顺序查找,那几乎就是无脑查找。

哈希冲突无法完全避免,因此为了提高HashMap的性能,HashMap不得尽量缓解哈希冲突以缩短每个桶的外挂链表长度。

频繁产生哈希冲突最重要的原因就像是要存储的Entry太多,而桶不够,这和供不应求的矛盾类似。因此,当HashMap中的存储的Entry较多的时候,我们就要考虑增加桶的数量,这样对于后续要存储的Entry来讲,就会大大缓解哈希冲突。

因此就涉及到HashMap的扩容,上面算是回答了为什么扩容,那么什么时候扩容?扩容多少?怎么扩容?便是第二部分要总结的了。

2. HashMap扩容

2.1 HashMap的扩容时机

在使用HashMap的过程中,我们经常会遇到这样一个带参数的构造方法。

1
public HashMap(int initialCapacity, float loadFactor) ;
  • 第一个参数:初始容量,指明初始的桶的个数;相当于桶数组的大小。
  • 第二个参数:装载因子,是一个0-1之间的系数,根据它来确定需要扩容的阈值,默认值是0.75。

现在开始通过源码来寻找扩容的时机:

put(K, V)操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V put(K key, V value) {
if (key == null)
return putForNullKey(value);
int hash = hash(key);//计算键的hash值
int i = indexFor(hash, table.length);//通过hash值对应到桶位置
for (Entry<K,V> e = table[i]; e != null; e = e.next) {//顺序遍历桶外挂的单链表
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {//注意这里的键的比较方式== 或者 equals()
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}

modCount++;
addEntry(hash, key, value, i);//遍历单链表完毕,没有找到与键相对的Entry,需要新建一个Entry换句话说就是桶i是一个空桶;
return null;
}

既然找到一个空桶,那么新建的Entry必然会是这个桶外挂单链表的第一个结点。通过addEntry,找到了扩容的时机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

/**
* Adds a new entry with the specified key, value and hash code to
* the specified bucket. It is the responsibility of this
* method to resize the table if appropriate.
*
* Subclass overrides this to alter the behavior of put method.
*/
void addEntry(int hash, K key, V value, int bucketIndex) {
if ((size >= threshold) && (null != table[bucketIndex])) {//当size大于等于某一个阈值thresholdde时候且该桶并不是一个空桶;
/*这个这样说明比较好理解:因为size 已经大于等于阈值了,说明Entry数量较多,哈希冲突严重,那么若该Entry对应的桶不是一个空桶,这个Entry的加入必然会把原来的链表拉得更长,因此需要扩容;若对应的桶是一个空桶,那么此时没有必要扩容。*/
resize(2 * table.length);//将容量扩容为原来的2倍
hash = (null != key) ? hash(key) : 0;
bucketIndex = indexFor(hash, table.length);//扩容后的,该hash值对应的新的桶位置
}

createEntry(hash, key, value, bucketIndex);//在指定的桶位置上,创建一个新的Entry
}

/**
* Like addEntry except that this version is used when creating entries
* as part of Map construction or "pseudo-construction" (cloning,
* deserialization). This version needn't worry about resizing the table.
*
* Subclass overrides this to alter the behavior of HashMap(Map),
* clone, and readObject.
*/
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<>(hash, key, value, e);//链表的头插法插入新建的Entry
size++;//更新size
}

上面有几个重要成员变量:

  • size
  • threshold
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* The number of key-value mappings contained in this map.
*/
transient int size;

/**
* The next size value at which to resize (capacity * load factor).
* @serial
*/
int threshold;

/**
* The load factor for the hash table.
*
* @serial
*/
final float loadFactor;

由注释可以知道:

  • size记录的是map中包含的Entry的数量

  • 而threshold记录的是需要resize的阈值 且 threshold = loadFactor * capacity

  • capacity 其实就是桶的长度

    1
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);

因此现在总结出扩容的时机:

map中包含的Entry的数量大于等于threshold = loadFactor * capacity的时候,且新建的Entry刚好落在一个非空的桶上,此刻触发扩容机制,将其容量扩大为2倍。(为什么2倍,而不是1.5倍,3倍,10倍;解释见最后的补充)

size大于等于threshold的时候,并不一定会触发扩容机制,但是会很可能就触发扩容机制,只要有一个新建的Entry出现哈希冲突,则立刻resize

直到这里我们回答了什么时候扩容和扩容多少的问题,那么下面回答如何扩容的问题。

2.2 HashMap的扩容过程

上面有一个很重要的方法,包含了几乎属于的扩容过程,这就是

1
resize()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 /**
* Rehashes the contents of this map into a new array with a
* larger capacity. This method is called automatically when the
* number of keys in this map reaches its threshold.
*
* If current capacity is MAXIMUM_CAPACITY, this method does not
* resize the map, but sets threshold to Integer.MAX_VALUE.
* This has the effect of preventing future calls.
*
* @param newCapacity the new capacity, MUST be a power of two;
* must be greater than current capacity unless current
* capacity is MAXIMUM_CAPACITY (in which case value
* is irrelevant).
*/
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {//最大容量为 1 << 30
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];//新建一个新表
boolean oldAltHashing = useAltHashing;
useAltHashing |= sun.misc.VM.isBooted() &&
(newCapacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
boolean rehash = oldAltHashing ^ useAltHashing;//是否再hash
transfer(newTable, rehash);//完成旧表到新表的转移
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

/**
* Transfers all entries from current table to newTable.
*/
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {//遍历同桶数组中的每一个桶
while(null != e) {//顺序遍历某个桶的外挂链表
Entry<K,V> next = e.next;//引用next
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);//找到新表的桶位置;原桶数组中的某个桶上的同一链表中的Entry此刻可能被分散到不同的桶中去了,有效的缓解了哈希冲突。
e.next = newTable[i];//头插法插入新表中
newTable[i] = e;
e = next;
}
}
}

对于resize的过程,相对来讲是比较简单清晰易于理解的。旧桶数组中的某个桶的外挂单链表是通过头插法插入新桶数组中的,并且原链表中的Entry结点并不一定仍然在新桶数组的同一链表

示意图如下:

这里很容易就想到多线程情况下,隐约感觉这个transfer方法在多线程环境下会乱套。事实上也是这样的,由于缺乏同步机制,当多个线程同时resize的时候,某个线程t所持有的引用next(参考上面代码next指向原桶数组中某个桶外挂单链表的下一个需要转移的Entry),可能已经被转移到了新桶数组中,那么最后该线程t实际上在对新的桶数组进行transfer操作。

如果有更多的线程出现这种情况,那很可能出现大量线程都在对新桶数组进行transfer,那么就会出现多个线程对同一链表无限进行链表反转的操作,极易造成死循环,数据丢失等等,因此HashMap不是线程安全的,考虑在多线程环境下使用并发工具包下的ConcurrentHashMap

3. 补充

3.1 容量必须是2的幂

resize(),为什么容量需要时2倍这样扩张,而不是1.5倍,3倍,10倍,另外在HashMap中有如下的代码:

1
2
3
4
 /**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);

// Find a power of 2 >= initialCapacity 找到一个大于等于初始容量的且是2的幂的数作为实际容量
int capacity = 1;
while (capacity < initialCapacity)
capacity <<= 1;

this.loadFactor = loadFactor;
threshold = (int)Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
table = new Entry[capacity];
useAltHashing = sun.misc.VM.isBooted() &&
(capacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
init();
}

通过以上我们知道HashMap的容量必须是2的幂,那么为什么要这么设计呢?答案当然是为了性能。在HashMap通过键的哈希值进行定位桶位置的时候,调用了一个indexFor(hash, table.length);方法。

1
2
3
4
5
6
/**
* Returns index for hash code h.
*/
static int indexFor(int h, int length) {
return h & (length-1);
}

可以看到这里是将哈希值h与桶数组的length-1(实际上也是map的容量-1)进行了一个与操作得出了对应的桶的位置,h & (length-1)

但是为什么不采用h % length这种计算方式呢?

https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4631373中提出Java%/操作比&慢10倍左右,因此采用&运算会提高性能。

**通过限制length是一个2的幂数,h & (length-1)h % length结果是一致的。**这就是为什么要限制容量必须是一个2的幂的原因。

举个简单的例子说明这两个操作的结果一致性:

假设有个hashcode是311,对应的二进制是(1 0011 0111)

length为16,对应的二进制位(1 0000)

  • %操作:311 = 16*19 + 7;所以结果为7,二进制位(0111);

  • &操作:(1 0011 0111) & (0111) = 0111 = 7, 二进制位(0111)

    1 0011 0111 = (1 0011 0000) + (0111) = (1*2^4 + 1* 2^5 + 0*2^6 + 0*2^7 + 1*2^8 ) + 7 = 2^4*(1 + 2 + 0 + 0 + 16) + 7 = 16 * 19 + 7; 和%操作一致。

    如果length是一个2的幂的数,那么length-1就会变成一个mask, 它会将hashcode低位取出来,hashcode低位实际就是余数,和取余操作相比,与操作会将性能提升很多。

3.2 rehash

通过上面的分析可以看出,不同的键的的hashcode仅仅只能通过低位来区分。高位的信息没有被充分利用,举个例子:

假设容量为为16, 二进制位(10000)。

key1hashcode11111 10101,另一个key2hashcode00000 10101,很明显这两个hashcode不是一样的,甚至连相似性(例如海明距离)也是很远的。但是直接进行&操作得出的桶位置是同一个桶,这直接就产生了哈希冲突

由于键的hashCodeHashMap的使用者来设计的,主要也就是我们这群程序员,由于设计一个良好的hashcode分布,是比较困难的,因此会容易出现分布质量差的hashcode分布,极端情况就是:所有的hashCode低位全相等,而高位不相等,这大大加大了哈希冲突,降低了HashMap的性能。

为了防止这种情况的出现,HashMap它使用一个supplemental hash function对键的hashCode再进行了一个supplemental hash ,将最终的hash值作为键的hash值来进行桶的位置映射(也就是说JDK团队在为我们这群程序员加性能保险Orz)。这个过程叫做再哈希(rehash)。

经过一个supplemental hash过程后,能保证海明距离为常数的不同的hashcode有一个哈希冲突次数上界(装载因子为0.75的时候,大约是8次)。

参见下段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Retrieve object hash code and applies a supplemental hash function to the
* result hash, which defends against poor quality hash functions. This is
* critical because HashMap uses power-of-two length hash tables, that
* otherwise encounter collisions for hashCodes that do not differ
* in lower bits. Note: Null keys always map to hash 0, thus index 0.
*/
final int hash(Object k) {
int h = 0;
if (useAltHashing) {
if (k instanceof String) {
return sun.misc.Hashing.stringHash32((String) k);
}
h = hashSeed;
}

h ^= k.hashCode();

// This function ensures that hashCodes that differ only by
// constant multiples at each bit position have a bounded
// number of collisions (approximately 8 at default load factor).
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

4 References

  1. http://www.javarticles.com/2012/11/hashmap-faq.html
  2. http://blog.csdn.net/u014532901/article/details/78573740
  3. https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4631373

Java Array、List、Set互相转化

1. Array、List、Set互转实例

1.1 Array、List互转

  • ArrayList

    1
    2
    String[] s = new String[]{"A", "B", "C", "D","E"};
    List<String> list = Arrays.asList(s);

    注意这里list里面的元素直接是s里面的元素( list backed by the specified array),换句话就是说:s的修改,直接影响list

    1
    2
    s[0] ="AA";
    System.out.println("list: " + list);

    输出结果

    1
    list: [AA, B, C, D, E]
  • ListArray

    1
    2
    String[] dest = list.toArray(new String[0]);//new String[0]是指定返回数组的类型
    System.out.println("dest: " + Arrays.toString(dest));

    输出结果

    1
    dest: [AA, B, C, D, E]

    注意这里的dest里面的元素不是list里面的元素,换句话就是说:list中关于元素的修改,不会影响dest

    1
    2
    3
    list.set(0, "Z");
    System.out.println("modified list: " + list);
    System.out.println("dest: " + Arrays.toString(dest));

    输出结果

    1
    2
    modified list: [Z, B, C, D, E]
    dest: [AA, B, C, D, E]

    可以看到list虽然被修改了,但是dest数组没有没修改。

1.2 List、Set互转

因为ListSet都实现了Collection接口,且addAll(Collection<? extends E> c);方法,因此可以采用addAll()方法将ListSet互相转换;另外,ListSet也提供了Collection<? extends E> c作为参数的构造函数,因此通常采用构造函数的形式完成互相转化。

1
2
3
4
5
6
//List转Set
Set<String> set = new HashSet<>(list);
System.out.println("set: " + set);
//Set转List
List<String> list_1 = new ArrayList<>(set);
System.out.println("list_1: " + list_1);

toArray()一样,被转换的List(Set)的修改不会对被转化后的SetList)造成影响。

1.3 Array、Set互转

1.1 1.2可完成Array和Set的互转

1
2
3
4
5
6
7
//array转set
s = new String[]{"A", "B", "C", "D","E"};
set = new HashSet<>(Arrays.asList(s));
System.out.println("set: " + set);
//set转array
dest = set.toArray(new String[0]);
System.out.println("dest: " + Arrays.toString(dest));

2. Arrays.asList()Collection.toArray()

上述列出的互相转换离不开Arrays.asList()Collection.toArray()两个重要的方法;

This method acts as bridge between array-based and collection-based APIs, in combination with Collection.toArray. The returned list is serializable and implements RandomAccess.

  • Arrays.asList()

    1
    2
    3
    4
    5
    @SafeVarargs
    @SuppressWarnings("varargs")
    public static <T> List<T> asList(T... a) {
    return new ArrayList<>(a);
    }

    这里出现的ArrayList<>并不是我们通常使用的java.util.ArrayList,因为java.util.ArrayList没有数组作为参数的构造函数。查看对应的源码发现,其实Arrays类的静态内部类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96

    /**
    * @serial include
    */
    private static class ArrayList<E> extends AbstractList<E>
    implements RandomAccess, java.io.Serializable
    {
    private static final long serialVersionUID = -2764017481108945198L;
    private final E[] a;

    ArrayList(E[] array) {
    a = Objects.requireNonNull(array);
    }

    @Override
    public int size() {
    return a.length;
    }

    @Override
    public Object[] toArray() {
    return a.clone();
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> T[] toArray(T[] a) {
    int size = size();
    if (a.length < size)
    return Arrays.copyOf(this.a, size,
    (Class<? extends T[]>) a.getClass());
    System.arraycopy(this.a, 0, a, 0, size);
    if (a.length > size)
    a[size] = null;
    return a;
    }

    @Override
    public E get(int index) {
    return a[index];
    }

    @Override
    public E set(int index, E element) {
    E oldValue = a[index];
    a[index] = element;
    return oldValue;
    }

    @Override
    public int indexOf(Object o) {
    E[] a = this.a;
    if (o == null) {
    for (int i = 0; i < a.length; i++)
    if (a[i] == null)
    return i;
    } else {
    for (int i = 0; i < a.length; i++)
    if (o.equals(a[i]))
    return i;
    }
    return -1;
    }

    @Override
    public boolean contains(Object o) {
    return indexOf(o) != -1;
    }

    @Override
    public Spliterator<E> spliterator() {
    return Spliterators.spliterator(a, Spliterator.ORDERED);
    }

    @Override
    public void forEach(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    for (E e : a) {
    action.accept(e);
    }
    }

    @Override
    public void replaceAll(UnaryOperator<E> operator) {
    Objects.requireNonNull(operator);
    E[] a = this.a;
    for (int i = 0; i < a.length; i++) {
    a[i] = operator.apply(a[i]);
    }
    }

    @Override
    public void sort(Comparator<? super E> c) {
    Arrays.sort(a, c);
    }
    }

    可以看到,这个由Arrays类实现的另一个Arrays$ArrayList,对于java.util.ArrayList类来讲,是比较简单粗糙的类。

    • 没有扩容机制;

    • 无法在指定位置add(int index, E element),调用该方法会抛异常;

      这些不同让这个ArrayList看起来实际上就是一个**List-View的数组**。

  • Collection.toArray()

虽然ListSet的具体实现类都对Collection.toArray()方法进行了不同程度的重写,但是大致都差不多。

这里选AbstractCollection.toArray()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

public <T> T[] toArray(T[] a) {
// Estimate size of array; be prepared to see more or fewer elements
int size = size();
T[] r = a.length >= size ? a :
(T[])java.lang.reflect.Array
.newInstance(a.getClass().getComponentType(), size);//如果给定的参数T[] a的长度足够存放当前collection(list or set)的元素,则采用该参数来存放元素;否则则根据参数给定的类型反射生成一个数组;
//因此这里的参数T[] a有俩作用;第一:可能用作存放元素;第二:为返回数组提供类型
Iterator<E> it = iterator();
for (int i = 0; i < r.length; i++) {
if (! it.hasNext()) { // fewer elements than expected 集合的size少于给定的参数数组的长度
if (a == r) {
r[i] = null; // null-terminate 最后一个元素被设置为null,表明collection元素结束;
} else if (a.length < i) {
return Arrays.copyOf(r, i);
} else {
System.arraycopy(r, 0, a, 0, i);
if (a.length > i) {
a[i] = null;
}
}
return a;
}
r[i] = (T)it.next();
}
// more elements than expected
return it.hasNext() ? finishToArray(r, it) : r;
}
0%