基础构建模块:
- 委托是创建线程安全类的一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。
同步容器类:
- 同步容器类包括:Vector, Hashtable及Collections.synchronizedXxx()方法产生的实例。
- 同步容器类是线程安全的,但在某些情况下需要客户端加锁保护来实现一些复合操作。
- 常见复合操作:迭代,跳转,条件运算,如"若没有则添加"。
如下面的复合操作就有可能不安全:
/** * getLast, rmLast没有同步,可能导致lastIndex错乱 */@NotThreadSafepublic class UnsafeVector{ private final Vector v = new Vector<>(); public E getLast(){ int lastIndex = v.size()-1; return v.get(lastIndex); } public E rmLast(){ int lastIndex = v.size()-1; return v.remove(lastIndex); }}
- 由于同步容器类要遵守同步策略,即支持客户端加锁,上面代码可以通过客户端加锁实现线程安全:
/** * 通过客户端加锁实现线程安全 */@ThreadSafepublic class SafeVector{ private final Vector v = new Vector<>(); public E getLast(){ synchronized (v) { int lastIndex = v.size()-1; return v.get(lastIndex); } } public E rmLast(){ synchronized(v){ int lastIndex = v.size()-1; return v.remove(lastIndex); } }}
迭代器与ConcurrentModificationException:
- 容器在迭代过程中被修改时 ,就会抛出一个ConcurrentModificationException异常。
/** * 下面将会抛出:ConcurrentModificationException * 可通过在迭代前锁住vector, 但这样会损失并发性能 */@NotThreadSafepublic class ModificationExceptionVector { public static void main(String[] args) { Vectorvector = new Vector<>(); for (int i=0; i<10; i++){ vector.add(new Person(i, "person" + i)); } new Thread(new IterateThread(vector)).start(); new Thread(new RemoveThread(vector)).start(); } private static class RemoveThread implements Runnable{ private Vector v; private Random ran = new Random(); public RemoveThread(Vector v) { this.v = v; } @Override public void run() { try { // do 100 times' remove for (int i=0 ;i<5; i++){ v.remove(ran.nextInt(v.size())); Thread.sleep(500); } } catch (InterruptedException e) { } } } private static class IterateThread implements Runnable{ private Vector v; public IterateThread(Vector v) { this.v = v; } @Override public void run() { try { Iterator it = v.iterator(); while (it.hasNext()){ System.out.println(it.next()); Thread.sleep(500); } } catch (InterruptedException e) { } } }}
隐藏迭代器:
- 正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。
- 一些隐藏的迭代操作:hashCode, equals, containsAll, removeAll, retainAll等。
并发容器:
- 通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。
ConrrentHashMap:
之前有一篇文章介绍过ConcurrentHashMap:
- ConcurrentHashMap使用一种粒度更细的加锁机制来实现大程度的共享,这种机制称为分段锁(Lock Striping);
- ConcurrentHashMap的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中加锁,因为其返回的迭代器具有弱一致性,而非"及时失败"。
- ConcurrentHashMap对一些操作进行了弱化,如size(计算的是近似值,而不是精确值), isEmpty等。
额外的原子Map操作:
- ConcurrentMap声明了一些原子操作接口:
public interface ConcurrentMapextends Map { V putIfAbsent(K key, V value); boolean remove(Object key, Object value); boolean replace(K key, V oldValue, V newValue); V replace(K key, V value);}
CopyOnWriteArrayList:
- CopyOnWriteArrayList比同步List具有更高的并发性能,而且在迭代时不需要加锁或复制。
- 其安全性在于:只要发布一个事实不可变的对象,那么在访问该对象时就不需要进一步同步;在每次修改都会创建一个新的容器副本,从而实现可变性。
- 仅当迭代操作远远多于修改操作时,才应该使用"写入时复制"容器。比如事件通知系统,对监听器列表中的每个监听器进行通知。
阻塞队列和生产者--消费者模式:
- 在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具;它们能够意志或防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。
- BlockingQueue实现:LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue;
串行线程封闭:
- 对于可变对象,生产者--消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。
双端队列与工作密取:
- java6提供了双端队列:ArrayDeque, LinkedBlockingDeque;
- 双端队列适用于另一种模式:工作密取,每个消费者有各自的双端队列,这种模式非常适合既是消费者又是生产者问题。
- 当消费者自己的双端队列为空时,它会从其他消费者队列末尾中密取任务。
阻塞方法与中断方法:
- 阻塞的原因:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果等。
- 传递InterreuptedException: 抛出异常给方法调用者,或捕获异常,做一些清理工作再抛出抛出异常。
- 恢复中断:有时不能抛出InterruptedException, 比如在Runnable中,则可以恢复中断。
/** * 恢复中断状态以避免屏蔽中断 */public class TaskRunnable implements Runnable { private final BlockingQueuequeue; public TaskRunnable(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { doTask(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ...}
同步工具类:
- 任何一个对象都可以是同步工具类,java平台提供的一些同步工具类有:Semaphore(信号量), Barrier(栅栏), Latch(闭锁);
闭锁:
- 闭锁可以用来确保某些活动直到其他活动都完成后才继续执行;
一个计算多个线程启动到结束耗时的例子:
/** * 在计时测试中使用CountDownLatch来启动和停止线程 */public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); //所有线程同时开始执行task的阀门 final CountDownLatch endGate = new CountDownLatch(nThreads); //所有线程结束的阀门 for (int i=0; i
FutureTask:
- FutureTask也可用做闭锁,表示一种抽象的可生成结果的计算。
/** * 使用FutureTask来提前加载稍后需要的数据 */public class Preloader { private final FutureTaskfuture = new FutureTask<>( new Callable () { @Override public ProductInfo call() throws Exception { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } private ProductInfo loadProductInfo() { // TODO Auto-generated method stub return null; } public ProductInfo get() throws InterruptedException { try { return future.get(); } catch (ExecutionException e) { // exception handle return null; } }}
信号量:
- 计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量,也可以用来实现某种资源池,或者对容器施加边界。
/** * 使用Semaphore为容器设置边界 */public class BoundedHashSet{ private final Set set; private final Semaphore sem; public BoundedHashSet(int bound){ this.set = Collections.synchronizedSet(new HashSet ()); sem = new Semaphore(bound); //非公平 } public boolean add(T t) throws InterruptedException{ sem.acquire(); //请求semaphore, permits-1或阻塞到permits > 0 boolean wasAdded = false; try { wasAdded = set.add(t); return wasAdded; } finally{ if (!wasAdded) //未添加成功则释放semaphore sem.release(); } } public boolean remove(T t){ boolean wasRemoved = set.remove(t); if (wasRemoved) //删除成功permits+1; sem.release(); return wasRemoved; }}
栅栏:
- 栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件(CutDownLatch值减为0),栅栏用于等待其他线程。
/** * CyclicBarrier测试 */public class CyclicBarrierTest { public static void main(String[] args) { int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() { @Override public void run() { //最后一个线程到达栅栏时触发 System.out.println("all have finished."); } }); for (int i=0 ;i
- 除Barrier栅栏外,还有Exchanger栅栏,它是一种两方栅栏, 可以实现两个线程之间交换数据。
/** * 通过Exchanger交换2个线程数据 */public class ExchangerTest { public static void main(String[] args) { Exchangerexchanger = new Exchanger<>(); ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "B"); new Thread(exchangerRunnable1).start(); new Thread(exchangerRunnable2).start(); } private static class ExchangerRunnable implements Runnable{ private Exchanger exchanger; private String data; public ExchangerRunnable(Exchanger exchanger, String data){ this.exchanger = exchanger; this.data = data; } @Override public void run() { try { String beforeData = this.data; //阻塞直到另一个线程调用exchanger.exchange(), 交换数据 this.data = this.exchanger.exchange(this.data); System.out.println( Thread.currentThread().getName() + " exchanged " + beforeData + " for " + this.data ); } catch (InterruptedException e) { e.printStackTrace(); } } }}
构建高效且可伸缩的结果缓存:
- 一个简单安全,性能低下的缓存设计:
/** * 计算缓存器 * 内部使用HashMap实现计算结果的缓存 * 通过外部接口同步操作实现线程安全 * 但有可能由于计算时间过长导致性能低下 */public class Memoizer1 implements Computable { private final Map cache = new HashMap (); private final Computable c; public Memoizer1(Computable c) { this.c = c; } @Override public synchronized V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //计算 cache.put(key, result); } return result; }}
- 通过并发容器ConcurrentHashMap代替HashMap,提升并发性能:
/** * 计算缓存器 * 通过ConcurrentHashMap代替HashMap, 提升并发性能 * 但这样有可能多个线程同时调用compute方法, * 由于计算过程中还没有结果,有可能导致多个线程计算同样的值 */public class Memoizer2 implements Computable { private final Map cache = new ConcurrentHashMap (); private final Computable c; public Memoizer2(Computable c) { this.c = c; } @Override public V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //计算 cache.put(key, result); } return result; }}
- 通过FutureTask来弥补重复结果计算问题:
/** * 计算缓存器 * 通过FutureTask代替map中的Value * 这样可以在计算结果计算完成,就立即返回, * 但仍然有可能重复计算,因为存在非原子的复合操作"若没有则添加": if (f == null){...} */public class Memoizer3 implements Computable { private final Map > cache = new ConcurrentHashMap >(); private final Computable c; public Memoizer3(Computable c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { Futuref = cache.get(key); if (f == null){ Callable computeTask = new Callable () { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask ft = new FutureTask<>(computeTask); f = ft; cache.put(key, ft); ft.run(); //执行计算 } try { return f.get(); //获取计算结果 } catch (ExecutionException e) { //do exception handle } return null; }}
- 通过对CocurrentHashMap.putIfAbsent()对上面的问题进行修复:
/** * 计算缓存器 * 通过ConcurrentHashMap.putIfAbsent避免重复任务 */public class Memoizer implements Computable { private final ConcurrentHashMap > cache = new ConcurrentHashMap >(); private final Computable c; public Memoizer(Computable c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { while(true){ Future一,二,三,四就讲述了java并发编程的基础知识。f = cache.get(key); if (f == null){ Callable computeTask = new Callable () { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask ft = new FutureTask<>(computeTask); f = cache.putIfAbsent(key, ft); //该方法不会对相同key的值进行覆盖,这样避免了相同key的任务被计算 if (f == null) ft.run(); //执行计算 } try { return f.get(); //获取计算结果 } catch (CancellationException e){ cache.remove(key); //计算取消则移除对应的计算任务key } catch (ExecutionException e) { //do exception handle } } }}
并发技巧清单:
- 可变状态是至关重要的。
- 尽量将域声明为final类型,除非需要它们是可变的。
- 不可变对象一定是线程安全的。
不可变对象能极大地降低并发编程的复杂性。它们更为简单且安全,可以任意共享而无须使用加锁或保护性复制等机制。
- 封装有助于管理复杂性。
- 用锁保护每个可变变量。
- 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
- 在执行复合操作期间,要持有锁。
- 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会可能出问题。
- 不要自行推断不需要使用同步。
- 在设计过程中考虑线程安全,不要在上线出问题后再做。
- 将同步策略文档化。
不吝指正。