使用jmh进行性能基准测试

介绍

我们在选择不同框架、算法时,不同场景下的性能是很重要考虑因素。JMH这个Java的微基准测试框架提供简单的方式来实现性能测试的需求。本文将以一个对比序列化器性能的例子简单介绍JMH的使用。

创建项目

不同于 JUnit 这种测试框架,JMH推荐创建独立的项目来做测试。

使用maven创建

1
2
3
4
5
6
7
mvn archetype:generate \
-DinteractiveMode=false \
-DarchetypeGroupId=org.openjdk.jmh \
-DarchetypeArtifactId=jmh-java-benchmark-archetype \
-DgroupId=org.sample \
-DartifactId=test \
-Dversion=1.0

执行命令后生成项目

IDEA中创建项目

除了maven命令直接创建之外,也可以选择在IDE中创建maven,以IDEA为例。

在创建项目时,选择Maven项目,勾选 Create from archetype 并选择 Add Archetype...

在弹出的窗口中填入对应信息(当前最新版本为1.33)

{:height 660, :width 764}

之后就可以选择JMH的archetype在IDEA中创建项目了。

编写测试代码

项目自动生成的 pom.xml 文件中已经包含JMH运行最小依赖了,只需要加上待测试相关的依赖包。这里我要测试的是 spring-data-redis 中序列化对象相关的内容,因此需要添加以下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>

之后编写测试代码,这里我使用了对比了 ObjectHashMapperJackson2HashMapper 两个类的 toHash 方法平均调用时间。预热5轮,实际测试5轮并fork 5 个进程来进行测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@BenchmarkMode(Mode.AverageTime)  
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(5)
@State(Scope.Benchmark)
public class MyBenchmark {

private HashMapper objectHashMapper;
private HashMapper jacksonHashMapper;

@Setup
public void setup() {
objectHashMapper = new ObjectHashMapper();
jacksonHashMapper = new Jackson2HashMapper(false);
}

@Benchmark
public void testObjectHashMapper() {
SesAnswerRate answerRatePredictor = new SesAnswerRate(0.3F, 0.5F);
objectHashMapper.toHash(answerRatePredictor);
}

@Benchmark
public void testJacksonHashMapper() {
SesAnswerRate answerRatePredictor = new SesAnswerRate(0.3F, 0.5F);
jacksonHashMapper.toHash(answerRatePredictor);
}

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(MyBenchmark.class.getSimpleName())
.build();
new Runner(options).run();
}
}

建议IDEA用户安装idea-jmh-plugin插件,便于运行测试。

执行测试

如果没有安装IDE插件,可以执行 mvn clean package 打包,之后在项目下的target文件夹中执行 java -jar benchmarks.jar 运行。

最终运行结果如下:

1
2
3
Benchmark                          Mode  Cnt     Score     Error  Units
MyBenchmark.testJacksonHashMapper avgt 25 536.386 ± 25.589 ns/op
MyBenchmark.testObjectHashMapper avgt 25 1601.561 ± 139.910 ns/op

可以看到使用 Jackson2HashMapper 序列化对象的速度要比 ObjectHashMapper 快上3倍。

总结

可以看到利用JMH能够快速编写,运行测试代码,对于method级别的性能测试非常有用,篇幅所限在此不展开讲述更加具体的用法。

建议有需要的同学们阅读官方示例: jmh-samples

数据去哪了?:从一次生产事故聊聊并发编程原子性问题

1. 引言

最近公司小伙伴的服务遇到一个奇怪的丢数据问题:每天总是莫名其妙的丢几条数据,经过分析排查之后发现是没有处理好并发而导致的。

问题复盘之后我认为这是并发编程中典型的原子性问题。对于并发编程不是很熟悉的小伙伴来说是一个很好的例子。

2. 问题复盘

整个业务的逻辑其实是比较简单:不断的接收消息,定时的把收集的消息发送到一个目标地址。

2.1 关键代码

talk is cheap, show me the code!

我仿写了引起并发问题的类,只保留了核心逻辑,除了lomboklogback之外没有引入其他第三方包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class ConcurrentPostResult {

private List<String> cache = new ArrayList<>(1000);

/**
* 模拟接收数据
*
* @param data
*/
public void receive(String data) {
cache.add(data);
log.info("增加数据 {}, 当前数据容量 {}", data, cache.size());
}

/**
* 模拟发送数据
*
* @throws InterruptedException
*/
public void postResult() throws InterruptedException {
log.info("当前缓存数据数量 {}", cache.size());
// 等待10ms,模拟发送数据耗时,这里实际会拷贝一份数据进行发送
TimeUnit.MILLISECONDS.sleep(10);
log.info("发送数据后的缓存数据数量 {}", cache.size());
cache.clear();
log.info("清除缓存数据 {}", cache.size());
}

public static void main(String[] args) throws InterruptedException {
ConcurrentPostResult postResult = new ConcurrentPostResult();

int threadCount = 8;
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);

// 模拟并发接收数据
forkJoinPool.execute(() -> IntStream.range(0, 1000)
.mapToObj(String::valueOf)
.parallel().forEach(postResult::receive));

postResult.postResult();

}
}

2.2 输出结果

1
2
3
4
5
6
7
8
9
00:29:46.671 [main] INFO org.example.ConcurrentPostResult - 当前缓存数据数量 0
00:29:46.678 [ForkJoinPool-1-worker-0] INFO org.example.ConcurrentPostResult - 增加数据 85, 当前数据容量 169
...省略日志
00:29:46.686 [ForkJoinPool-1-worker-5] INFO org.example.ConcurrentPostResult - 增加数据 547, 当前数据容量 616
00:29:46.686 [main] INFO org.example.ConcurrentPostResult - 发送数据后的缓存数据数量 616
00:29:46.686 [ForkJoinPool-1-worker-6] INFO org.example.ConcurrentPostResult - 增加数据 797, 当前数据容量 617
...省略日志
00:29:46.686 [ForkJoinPool-1-worker-0] INFO org.example.ConcurrentPostResult - 增加数据 57, 当前数据容量 12
00:29:46.686 [main] INFO org.example.ConcurrentPostResult - 清除缓存数据 2

3. 问题分析

从上一节的日志输出可以看到,在执行postResult()方法发送数据,实际上会经过一个比较长的网络I/O操作^注1。并且执行该操作时,上游系统还在不断推送数据加入到缓存中,如下图所示:

一次并发问题

我们进入到postResult()方法时只发送缓存中的10条数据,但实际上在这个过程中可能不断有新数据加入到缓存中,这部分数据并没有发送给下游服务。

最后在发送完成之后执行了cache.clear()操作导致数据丢失。

4. 解决

我们没有意识到这是个并发的操作,因此下意识的认为接收与发送数据都是原子性的:执行接收数据的时候不会发送数据,执行发送数据的时候不会接收数据。

比较直接方法是对缓存的对象加锁,这里有两个注意点:

  1. 所有涉及到共享对象(这里是cache)的操作都需要加速
  2. 保证加的是同一把锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
public class ConcurrentPostResult {

private List<String> cache = new ArrayList<>(1000);

/**
* 模拟接收数据
*
* @param data
*/
public void receive(String data) {
// 对缓存加锁
synchronized (cache) {
cache.add(data);
}
log.info("增加数据 {}, 当前数据容量 {}", data, cache.size());
}

/**
* 模拟发送数据
*
* @throws InterruptedException
*/
public void postResult() throws InterruptedException, IOException, ClassNotFoundException {
List data2Send;
// 执行发送操作前加锁
synchronized (cache) {
// 深拷贝数据,避免cache对象被阻塞太久,对性能造成影响
data2Send = deepCopy(cache);
log.info("当前缓存数据数量 {}, 待发送的数据数量 {}", cache.size(), data2Send.size());
cache.clear();
}
// 等待20ms,模拟发送数据耗时,这个时间基本上能保证把1000条数据消耗完
TimeUnit.MILLISECONDS.sleep(20);
log.info("发送数据后的缓存数据数量 {}, 发送的数据数量 {}", cache.size(), data2Send.size());
}

public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
ConcurrentPostResult postResult = new ConcurrentPostResult();

int threadCount = 8;
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);

// 模拟并发接收数据
forkJoinPool.execute(() -> IntStream.range(0, 1000)
.mapToObj(String::valueOf)
.parallel().forEach(postResult::receive));

log.info("执行前");
TimeUnit.MILLISECONDS.sleep(5);
log.info("执行后");

postResult.postResult();
}

public static <T> List<T> deepCopy(List<T> src) throws IOException, ClassNotFoundException {
ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteOutput);
out.writeObject(src);

ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
ObjectInputStream input = new ObjectInputStream(byteInput);
List<T> dest = (List<T>) input.readObject();
return dest;
}
}

这里我特意增加了等待时间,可以看到已发送的数据与未发送数据之和为1000,确保数据未丢失。

1
2
3
4
5
6
7
8
9
10
11
12
21:13:17.268 [main] INFO org.example.ConcurrentPostResult - 执行前
21:13:17.274 [ForkJoinPool-1-worker-4] INFO org.example.ConcurrentPostResult - 增加数据 159, 当前数据容量 26
...省略日志
21:13:17.276 [ForkJoinPool-1-worker-3] INFO org.example.ConcurrentPostResult - 增加数据 932, 当前数据容量 157
21:13:17.277 [main] INFO org.example.ConcurrentPostResult - 执行后
21:13:17.277 [ForkJoinPool-1-worker-3] INFO org.example.ConcurrentPostResult - 增加数据 933, 当前数据容量 178
21:13:17.277 [ForkJoinPool-1-worker-7] INFO org.example.ConcurrentPostResult - 增加数据 204, 当前数据容量 176
21:13:17.288 [main] INFO org.example.ConcurrentPostResult - 当前缓存数据数量 178, 待发送的数据数量 178
21:13:17.288 [ForkJoinPool-1-worker-7] INFO org.example.ConcurrentPostResult - 增加数据 205, 当前数据容量 1
...省略日志
21:13:17.301 [ForkJoinPool-1-worker-4] INFO org.example.ConcurrentPostResult - 增加数据 874, 当前数据容量 822
21:13:17.311 [main] INFO org.example.ConcurrentPostResult - 发送数据后的缓存数据数量 822, 发送的数据数量 178

结论

本文从一个真实案例说起,分析了代码中隐藏的并发问题以及解决方案。

在这个例子中,以下几点内容是我们需要关注的:

  1. 分析你的服务是否存在并发场景
  2. 是否有对共享对象的操作(上文的例子中是cache对象)
  3. 加锁^注2可以解决并发问题中的原子性、一致性、顺序性问题

在这里我们只讨论了最直观的解决方案,有机会在后续的文章中将深入讨论Java内存模型来对并发问题追根溯源。

用上ConcurrentHashMap,就没有并发问题了?

主题

  • 并发问题的三个来源:原子性、可见性、有序性
  • ConcurrentHashMap只能保证提供的原子性读写操作是线程安全的

用户注册模拟并发问题

我们从一个用户注册的例子来了解并发问题。

在这个例子中模拟了用户注册行为,定义了相同用户名不能重复注册的规则,我们使用ConcurrentHashMap保存用户信息,通过模拟同时注册的动作体现并发问题。

定义用户类

1
2
3
4
5
6
class User {
// 用户名,也是Map的key
private String username;
private int age;
// 省略getter, setter方法
}

定义用户注册逻辑

用户注册的规则是用户名不能重复,假如重复就返回注册失败,我们也考虑到了线程安全,所以用ConcurrentHashMap来存储用户信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class UserService {

private Map<String, User> userMap = new ConcurrentHashMap();

boolean register(User user) {
if (userMap.containsKey(user.getUsername)) {
log.info("用户已存在");
return false;
} else {
userMap.put(user.getUsername, user);
log.info("用户注册成功, {}, {}", user.getUsername(), user.getAge());
return true;
}
}
}

模拟重复注册

接下来模拟用户重复注册的场景:

1
2
3
4
5
6
7
8
9
10
11
int threadCount = 8;

ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);

forkJoinPool.execute(() -> IntStream.range(0, threadCount)
.mapToObj(i -> new Person("张三", i))
.parallel().forEach(UserService::register));

// 等待1s,否则看不到日志输出程序就结束了
TimeUnit.SECONDS.sleep(1);

输出结果:

1
2
3
4
5
6
7
8
00:18:32.622 [ForkJoinPool-1-worker-1] INFO org.example.UserService - 用户注册成功, 张三, 5
00:18:32.622 [ForkJoinPool-1-worker-0] INFO org.example.UserService - 用户已存在
00:18:32.622 [ForkJoinPool-1-worker-4] INFO org.example.UserService - 用户注册成功, 张三, 1
00:18:32.622 [ForkJoinPool-1-worker-6] INFO org.example.UserService - 用户已存在
00:18:32.622 [ForkJoinPool-1-worker-5] INFO org.example.UserService - 用户注册成功, 张三, 4
00:18:32.622 [ForkJoinPool-1-worker-3] INFO org.example.UserService - 用户已存在
00:18:32.622 [ForkJoinPool-1-worker-2] INFO org.example.UserService - 用户注册成功, 张三, 2
00:18:32.622 [ForkJoinPool-1-worker-7] INFO org.example.UserService - 用户已存在

可以看到,在注册中存在判断用户是否已注册的逻辑,但在实际测试中有4个用户同时注册成功。^1

并发问题的三大根源

可见性、原子性、有序性

为什么用上了线程安全的ConcurrentHashMap还是出现了并发问题呢?

可见性问题

用户注册代码中使用containsKey()方法判断用户是否存在,直观上我们认为操作的是同一个Map,如果另一个线程写入了张三这个key,当前线程访问userMap时一定会看到,而实际情况要更加复杂一些。

在学习计算机原理的时候讲过CPU缓存、内存、硬盘三者的速度天差地别,因此CPU在计算时优先从离自己最近、速度最快的CPU缓存中获取数据去计算,其次再从内存中获取数据。

另外,CPU经历了多年的发展之后,单核的性能提升越来越困难,为了提高单机性能,如今的计算机都是采用多个CPU核心的方式。

下图所展现的就是CPU与其缓存以及内存之间的关系。每个CPU核心都有独享的Cache的缓存

多线程可见性

此处简化了CPU缓存架构,一般我们的CPU有3级缓存,就是一般我们听到的L1 Cache、L2 Cache和L3 Cache。其中L1 Cache和L2 Cache是CPU独享的,L3 Cache在逻辑上是共享模式。

而我们的线程可能会跑在不同的CPU核心上,此时Thread1将用户注册信息写入到内存中,但Thread2还是从自己的CPU缓存中获取的数据,因此对于Thread2来说看到的注册信息里没有张三,这就是可见性问题

多线程可见性2

原子性问题

即使两个线程跑在了同一个CPU核心上,避免了可见性问题干扰,另外一个原子性问题依然会让你的并发代码不可控。

下图展示了在时间轴上注册用户的流程,boolean register(User user)这个方法在CPU计算的时间尺度上并不是做一个操作,而是包含了:

  1. 访问userMap判断当前用户是否注册
  2. 注册用户

这两步操作,在Thread1访问userMap后返回当前用户未注册但还未将用户信息putuserMap前,Thread2也去访问了userMap那么它也会获取到当前用户未注册的结果,因此也会执行后面的注册操作。

CPU在执行任务时

用户注册并发1

而实际上我们希望判断用户是否注册注册用户这两步操作同时进行,如下图所示,Thread1在执行register(User user)方法时会将两个操作放在一起执行完,这与数据库事务的原子性理解差不多。

用户注册并发2

有序性问题

有序性问题是第三个引起并发编程Bug的源头。

编译器为了提高性能有时候会改变代码执行的顺序,对于单线程代码指令重排序对于执行没有什么影响,但是会对多线程并发代码执行产生不可预知的结果。原理可以参考上节的原子性问题

ConcurrentHashMap应该怎么用

说回到ConcurrentHashMap,它所说的线程安全到底指的是什么呢?

它所保证的是put()get()操作是线程安全的,上一节所说的可见性问题可以被解决。

在我们上文的例子中之所以出现线程安全问题,原因在于register(User user)这个方法中有复合操作,所以会有原子性问题

了解并发问题的根源之后,才能真正用好并发工具类,发挥它的真正威力。我们改造一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class UserService {

private Map<String, User> userMap = new ConcurrentHashMap();

boolean register(User user) {
User hasMapped = userMap.putIfAbsent(user.getUsername, user);
if (hasMapped != null) {
log.info("用户已存在");
return false;
} else {
log.info("用户注册成功, {}, {}", user.getUsername(), user.getAge());
return true;
}
}
}

这里我们使用了Map提供的putIfAbsent接口,其含义是如果key已经存在则返回存储的对象,否则返回null

putIfAbsent接口定义的时候不是线程安全的,但ConcurrentHashMap在实现的时候将这个方法实现为线程安全。在这个场景中如果不使用putIfAbsent就要对register(User user)方法加锁,对于性能的影响更大。

总结

ConcurrentHashMap因为一直以来都号称是线程安全的,因此对于其使用常常会陷入误区。要发挥出并发工具类的真正威力,一定要了解并发问题的本质,而并发问题的本质又与硬件知识息息相关。

受水平所限,本文只是从一个角度解释了ConcurrentHashMap引起的并发问题,并未深入分析ConcurrentHashMap的实现以及局限性。

参考资料