线程安全与线程安全的实现

讲解线程安全问题以及如何解决线程安全问题。

Java 中的线程安全

线程安全问题出现的原因是多个线程访问共享数据造成的。

按照线程安全的“安全程度”由强到弱,可以将 Java 中的线程安全分为五类:不可变、绝对线程安全、相对线程安全、线程兼容和线程对立。

不可变

在 Java 中,不可变对象一定是线程安全的,无论是对象的方法还是方法的调用者,都不需要再采取任何保障线程安全的措施。如果共享数据是基本数据类型,那么只要在定义时使用 final 关键字修饰就可以保证它是不可变的。如果共享数据是一个对象,那就需要保证对象的行为(方法)不会改变它的状态(属性)。

典型的不可变类型,比如 java.lang.String

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];

...

public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
}

String 类使用 final 修饰,意味着该类不可继承,这也就防止了子类在继承后重写方法改变其状态。String 的值是存放在一个名为 value 的数组中,数组用 final 修饰,但是这只能保证 value 的引用地址值不可变,引用指向的数组却可以发生变化,如:

1
2
3
4
final int value[] = {1, 2, 3};
int another[] = {4, 5, 6};
value = another; // 编译报错,final 修饰,引用地址值不可变
value[1] = 2333; // 直接修改数组的元素值

所以真正保证 String 不可变靠的是它的 replace()substring()concat()trim() 等方法都不会修改它原来的值,而是返回一个新构造的字符串对象。在 Java 中不可变的类型,除了 String 外,还有基本类型对应的包装类型,以及枚举类型、BigIntegerBigDecimal 类型等。

绝对线程安全

虽然在文中将线程安全按“安全程度”划分了五类,而实际上不可变可以看作绝对线程安全的一种。除了不可变类型,其他类要达到“不管运行环境如何,调用者都不需要任何额外的保证线程安全的措施”通常是不太可能的。Java 中通常所说的线程安全的类,大多不是绝对线程安全的,比如某些集合类:java.util.Vectorjava.util.HashTable 等等。

相对线程安全

相对线程安全就是我们通常意义上讲的线程安全,它保证对象的方法都是线程安全的,但是在调用者调用时,某些特殊的调用操作需要在调用端使用额外的保证线程安全的措施。比如:java.util.Vectorjava.util.HashTableCollections.synchronizedCollection() 方法包装的集合等。

线程兼容

线程兼容是指对象本身不是线程安全的,但是可以通过在调用端使用同步手段来保证对象在并发环境中安全使用。我们通常所说的一个类不是线程安全的,绝大多数是属于这种情况,比如:java.util.ArrayListjava.util.HashMap 等。

线程对立

线程对立是指无论调用端是否使用了同步手段,都无法保证在多线程环境下正确运行。这种情况是很少的,并且通常是有害的,应当尽量避免。常见的线程对立的操作有:System.setIn()System.setOut() 等。

线程安全的实现方法

重点讨论除了不可变外的其他方法。

互斥同步

互斥同步是一种常见的并发正确性保障手段。并发的核心矛盾是“竞态条件”,即多个线程同时访问共享变量,这个共享变量也可以叫做“竞态资源”,而涉及访问竞态资源的代码片段称为“临界区”。保障竞态资源安全的一个思路就是让临界区的代码“互斥”,即同一时刻最多只能有一个线程进入临界区,而保证同一时刻只有一个线程进入临界区的锁被称为互斥量 (Mutex)。有的时候,临界区可以允许 N 个线程进入,这个时候可以把互斥量推广,引入新的锁概念:信号量 (Semaphore)

synchronized

在 Java 中,最基本的互斥同步手段就是 synchronized 关键字。synchronized 被编译后,会在同步代码块的前后分别形成 monitorentermonitorexit 两个字节码指令,这两个字节码都需要一个引用类型的参数来指明要锁定和解锁的对象。如果代码中没有明确指定锁对象,那么将会根据 synchronized 修饰的是实例方法还是类方法,取对应的实例对象或 Class 对象作为锁对象。

注:同步方法编译后会在常量池中存储 ACC_SYNCHRONIZED 标记,方法调用时,调用指令检查方法的 ACC_SYNCHRONIZED 是否被设置来实现同步。

使用 javap 分析查看同步方法的字节码:

1
2
3
4
5
6
public class Demo {

public synchronized void doSomething() {
System.out.println("doSomething");
}
}

同步方法

根据虚拟机规范的要求,在执行 monitorenter 指令时,首先尝试获取对象的锁,如果对象的锁没被占用,或者当前线程已经拥有了对象的锁,则把锁的计数器加 1,此时其他线程试图进入临界区时,操作系统会将程序的运行由用户态切换到内核态来执行内核指令(阻塞其他线程的指令,由于 Java 的线程是映射到系统原生线程上的,所以阻塞和唤醒一个线程都需要内核帮助)。在执行 monitorexit 指令时锁计数器减一,当计数器为 0 时,锁就被释放。

注:synchronized 同步块对于同一个线程来说是可重入的,即同一个线程,在拿到了对象的锁权限后,可以多次进入临界区,每进入一次锁计数器加 1。与之相对的,在线程离开临界区时,需要释放对应次数的锁权限。

ReentrantLock

除了 synchronized 之外,还可以使用 java.util.concurrent.ReentrantLock 来实现同步。ReentrantLock 是独占互斥可重入锁,与 synchronized 类似,但两者的实现不同。synchronized 依赖的是监视器 Monitor,而 ReentrantLock 依赖的是 AQS。并且,ReentrantLock 还增加了一些高级功能,如支持响应中断、可实现公平锁、锁可以绑定多个条件等。在后续的 JDK 版本中,加入了很多针对锁的优化措施,在 JDK 1.6 发布之后,synchronizedReentrantLock 的性能基本持平了。

非阻塞同步

互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因此这种同步也被称为阻塞同步,互斥同步属于一种悲观的并发策略,随着硬件指令集的发展,我们有了另外一个选择:基于冲突检测的乐观并发策略。通俗的讲,就是先进行操作,如果没有其他线程争用共享数据,则操作成功;如果共享数据有争用,产生了冲突,再采用其他的补偿措施(常见的就是不断的重试,直到成功)。由于这种乐观的并发策略不需要将线程挂起,所以这种同步操作被称为非阻塞同步。

乐观并发策略需要硬件指令集的发展来支持的原因是,我们需要操作和冲突检测这两个步骤具备原子性,这里不能再使用互斥同步,只能靠硬件来完成,由硬件保证一个从语义上看起来需要多次操作的行为只通过一条处理器指令就能完成,常用的有:

  • 测试并设置(Test and Set)
  • 获取并增加(Fetch and Increment)
  • 交换(Swap)
  • 比较并交换(Compare and Swap, CAS)
  • 加载链接/条件存储(Load Linked/Store Conditional, LL/SC)

其中,前三个在很早之前就已经存在于大多数指令集中,后面的两条是现代处理器新增的,这两条指令的目的和功能类似。CAS 指令需要 3 个操作数,分别是内存位置(在 Java 中可以简单理解为变量的内存地址,用 V 表示)、旧的预期值(用 A 表示)和新值(用 B 表示)。CAS 指令执行时,当且仅当 V 符合预期值 A 时,处理器才会用新值 B 去更新 V 的值,否则不更新,无论更新与否,返回 V 的旧值。这整个操作是一个原子操作。

演示非阻塞同步

我们知道 volatile 关键字解决了变量的可见性问题,但是无法保证变量的原子性,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class VolatileTest {

private static volatile int race = 0;

private static final int THREADS_COUNT = 20;

public static void increase() {
race++;
}

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[THREADS_COUNT];

for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increase();
}
});
threads[i].start();
}

for (int j = 0; j < THREADS_COUNT; j++)
threads[j].join();

System.out.println(race);
}
}

这段代码,理想的结果应该是打印出 20000,然而实际结果经常小于 20000。反汇编代码:

反汇编代码

increase() 方法中的 race++ 并不是原子操作。

  1. getstatic 指令获取类的静态变量,并将其压入栈顶。
  2. iconst_1 指令将一个值为 1 的 int 型推至栈顶。
  3. iadd 指令将栈顶的两个 int 型数值相加并将结果压入栈顶。
  4. putstatic 指令将为指定的类的静态变量赋值。
  5. return 指令从当前方法返回 void。

在某个线程中,当 getstatic 指令将 race 的值压入栈顶时,volatile 关键字保证了此时的 race 值是正确的,但是在执行 iconst_1iadd 指令时,其他线程可能已经把 race 的值增加了,此时当前线程栈顶的值就变成了过期的值,所以 putstatic 指令执行后就有可能将较小的 race 值同步回了主内存中。

那么怎么修改这段代码呢?当然用 synchronized 修饰 increase() 方法是可行的,但是这里演示如何通过 CAS 来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class VolatileTest {

private static AtomicInteger race = new AtomicInteger(0);

private static final int THREADS_COUNT = 20;

public static void increase() {
race.incrementAndGet();
}

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[THREADS_COUNT];

for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increase();
}
});
threads[i].start();
}

for (int j = 0; j < THREADS_COUNT; j++)
threads[j].join();

System.out.println(race);
}
}

使用 AtomicIntegerincrementAndGet() 方法能够实现原子操作的自增。查看该方法的实现:

1
2
3
4
5
6
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
1
2
3
4
5
6
7
8
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}

这里分析的是 JDK 10 的实现,JDK 8 的实现与之类似,唯一的差别就是在 JDK 8 中使用的是 compareAndSetInt 进行 CAS 操作。

通过 U.objectFieldOffset(AtomicInteger.class, "value") 方法获取的是 AtomicInteger 对象中的 value 字段相对于对象的起始内存地址的字节偏移量。

注:一个 Java 对象可以看成一段内存,每个字段按照一定的顺序放入这段内存中——对象的布局。考虑到内存对齐的要求,可能这些字段不是连续放置的,使用 objectFieldOffset() 方法能够计算出某个字段相对于对象的起始内存地址的字节偏移量,同时 Unsafe 类还提供了 getInt()getLong()getObject() 等方法使用偏移量来访问某个字段。

getIntVolatile() 方法通过偏移量获取 AtomicInteger 对象的 value 值,与 getInt() 不同,它支持 volatile load 语义。然后循环执行 weakCompareAndSetInt() 方法试图将新值赋给 value,直到更新成功为止,最终返回旧值 + 1。

无同步

可重入代码

线程同步保证了共享数据争用时的正确性,如果一个方法本来就不涉及共享数据,也就无需同步去保证线程安全,因此有一些天生就是线程安全的代码,这种代码有一个很重要的特征:方法返回的结果可以预测,即只要输入参数相同,返回结果必然相同。

线程本地存储

在 Java 中,能够进行线程本地存储的为 java.lang.ThreadLocal,通常称它为线程局部变量,使用它能够将本来由线程共享的变量在每个线程中分别存放一份副本,这样每个线程操作的就是自己的那个副本,从而达到线程隔离的目的。

参考

《深入理解 Java 虚拟机:JVM 高级特性与最佳实践》

阮一峰:进程与线程的一个简单解释