博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发编程(四): 基础构建模块
阅读量:6437 次
发布时间:2019-06-23

本文共 10959 字,大约阅读时间需要 36 分钟。

hot3.png

基础构建模块:

  • 委托是创建线程安全类的一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。

同步容器类:

  • 同步容器类包括:Vector, HashtableCollections.synchronizedXxx()方法产生的实例。
  • 同步容器类是线程安全的,但在某些情况下需要客户端加锁保护来实现一些复合操作
  • 常见复合操作:迭代跳转条件运算,如"若没有则添加"。

如下面的复合操作就有可能不安全:

/** * getLast, rmLast没有同步,可能导致lastIndex错乱 */@NotThreadSafepublic class UnsafeVector
{ private final Vector
v = new Vector<>(); public E getLast(){ int lastIndex = v.size()-1; return v.get(lastIndex); } public E rmLast(){ int lastIndex = v.size()-1; return v.remove(lastIndex); }}
  • 由于同步容器类要遵守同步策略,即支持客户端加锁,上面代码可以通过客户端加锁实现线程安全:
/** * 通过客户端加锁实现线程安全 */@ThreadSafepublic class SafeVector
{ private final Vector
v = new Vector<>(); public E getLast(){ synchronized (v) { int lastIndex = v.size()-1; return v.get(lastIndex); } } public E rmLast(){ synchronized(v){ int lastIndex = v.size()-1; return v.remove(lastIndex); } }}

迭代器与ConcurrentModificationException:

  • 容器在迭代过程中被修改时 ,就会抛出一个ConcurrentModificationException异常。
/** * 下面将会抛出:ConcurrentModificationException * 可通过在迭代前锁住vector, 但这样会损失并发性能 */@NotThreadSafepublic class ModificationExceptionVector {	public static void main(String[] args) {		Vector
vector = new Vector<>(); for (int i=0; i<10; i++){ vector.add(new Person(i, "person" + i)); } new Thread(new IterateThread(vector)).start(); new Thread(new RemoveThread(vector)).start(); } private static class RemoveThread implements Runnable{ private Vector
v; private Random ran = new Random(); public RemoveThread(Vector
v) { this.v = v; } @Override public void run() { try { // do 100 times' remove for (int i=0 ;i<5; i++){ v.remove(ran.nextInt(v.size())); Thread.sleep(500); } } catch (InterruptedException e) { } } } private static class IterateThread implements Runnable{ private Vector
v; public IterateThread(Vector
v) { this.v = v; } @Override public void run() { try { Iterator
it = v.iterator(); while (it.hasNext()){ System.out.println(it.next()); Thread.sleep(500); } } catch (InterruptedException e) { } } }}

隐藏迭代器:

  • 正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。
  • 一些隐藏的迭代操作:hashCode, equals, containsAll, removeAll, retainAll等。

并发容器:

  • 通过并发容器来代替同步容器,可以极大地提高伸缩性降低风险

ConrrentHashMap:

之前有一篇文章介绍过ConcurrentHashMap: 

  • ConcurrentHashMap使用一种粒度更细的加锁机制来实现大程度的共享,这种机制称为分段锁(Lock Striping);
  • ConcurrentHashMap的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中加锁,因为其返回的迭代器具有弱一致性,而非"及时失败"
  • ConcurrentHashMap对一些操作进行了弱化,如size(计算的是近似值,而不是精确值), isEmpty等。

额外的原子Map操作:

  • ConcurrentMap声明了一些原子操作接口:
public interface ConcurrentMap
extends Map
{ V putIfAbsent(K key, V value); boolean remove(Object key, Object value); boolean replace(K key, V oldValue, V newValue); V replace(K key, V value);}

CopyOnWriteArrayList:

  • CopyOnWriteArrayList比同步List具有更高的并发性能,而且在迭代时不需要加锁或复制
  • 其安全性在于:只要发布一个事实不可变的对象,那么在访问该对象时就不需要进一步同步;在每次修改都会创建一个新的容器副本,从而实现可变性
  • 仅当迭代操作远远多于修改操作时,才应该使用"写入时复制"容器。比如事件通知系统,对监听器列表中的每个监听器进行通知。

阻塞队列和生产者--消费者模式:

  • 在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具;它们能够意志或防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。
  • BlockingQueue实现:LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue;

串行线程封闭:

  • 对于可变对象生产者--消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。

双端队列与工作密取:

  • java6提供了双端队列:ArrayDeque, LinkedBlockingDeque;
  • 双端队列适用于另一种模式:工作密取,每个消费者有各自的双端队列,这种模式非常适合既是消费者又是生产者问题。
  • 当消费者自己的双端队列为空时,它会从其他消费者队列末尾中密取任务。

阻塞方法与中断方法:

  • 阻塞的原因:等待I/O操作结束等待获得一个锁等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果等。
  • 传递InterreuptedException: 抛出异常给方法调用者,或捕获异常,做一些清理工作再抛出抛出异常。
  • 恢复中断:有时不能抛出InterruptedException, 比如在Runnable中,则可以恢复中断
/** * 恢复中断状态以避免屏蔽中断 */public class TaskRunnable implements Runnable {	private final BlockingQueue
queue; public TaskRunnable(BlockingQueue
queue) { this.queue = queue; } @Override public void run() { try { doTask(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }       ...}

同步工具类:

  • 任何一个对象都可以是同步工具类,java平台提供的一些同步工具类有:Semaphore(信号量), Barrier(栅栏), Latch(闭锁);

闭锁:

  • 闭锁可以用来确保某些活动直到其他活动都完成后才继续执行;

一个计算多个线程启动到结束耗时的例子:

/** * 在计时测试中使用CountDownLatch来启动和停止线程 */public class TestHarness {	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{		final CountDownLatch startGate = new CountDownLatch(1); //所有线程同时开始执行task的阀门		final CountDownLatch endGate = new CountDownLatch(nThreads); //所有线程结束的阀门				for (int i=0; i

FutureTask:

  • FutureTask也可用做闭锁,表示一种抽象的可生成结果的计算。
/** * 使用FutureTask来提前加载稍后需要的数据 */public class Preloader {	private final FutureTask
future = new FutureTask<>( new Callable
() { @Override public ProductInfo call() throws Exception { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } private ProductInfo loadProductInfo() { // TODO Auto-generated method stub return null; } public ProductInfo get() throws InterruptedException { try { return future.get(); } catch (ExecutionException e) { // exception handle return null; } }}

信号量:

  • 计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量,也可以用来实现某种资源池,或者对容器施加边界
/** * 使用Semaphore为容器设置边界 */public class BoundedHashSet
{ private final Set
set; private final Semaphore sem; public BoundedHashSet(int bound){ this.set = Collections.synchronizedSet(new HashSet
()); sem = new Semaphore(bound); //非公平 } public boolean add(T t) throws InterruptedException{ sem.acquire(); //请求semaphore, permits-1或阻塞到permits > 0 boolean wasAdded = false; try { wasAdded = set.add(t); return wasAdded; } finally{ if (!wasAdded) //未添加成功则释放semaphore sem.release(); } } public boolean remove(T t){ boolean wasRemoved = set.remove(t); if (wasRemoved) //删除成功permits+1; sem.release(); return wasRemoved; }}

栅栏:

  • 栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件(CutDownLatch值减为0)栅栏用于等待其他线程
/** * CyclicBarrier测试 */public class CyclicBarrierTest {		public static void main(String[] args) {		int threadCount = 3;		CyclicBarrier barrier =				 new CyclicBarrier(threadCount, new Runnable() {					@Override					public void run() { //最后一个线程到达栅栏时触发						System.out.println("all have finished.");					}		});				for (int i=0 ;i
  • 除Barrier栅栏外,还有Exchanger栅栏,它是一种两方栅栏, 可以实现两个线程之间交换数据
/** * 通过Exchanger交换2个线程数据 */public class ExchangerTest {	public static void main(String[] args) {		Exchanger
exchanger = new Exchanger<>(); ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "B"); new Thread(exchangerRunnable1).start(); new Thread(exchangerRunnable2).start(); } private static class ExchangerRunnable implements Runnable{ private Exchanger
exchanger; private String data; public ExchangerRunnable(Exchanger
exchanger, String data){ this.exchanger = exchanger; this.data = data; } @Override public void run() { try { String beforeData = this.data; //阻塞直到另一个线程调用exchanger.exchange(), 交换数据 this.data = this.exchanger.exchange(this.data); System.out.println( Thread.currentThread().getName() + " exchanged " + beforeData + " for " + this.data ); } catch (InterruptedException e) { e.printStackTrace(); } } }}

构建高效且可伸缩的结果缓存:

  • 一个简单安全,性能低下的缓存设计:
/** * 计算缓存器 * 内部使用HashMap实现计算结果的缓存 * 通过外部接口同步操作实现线程安全 * 但有可能由于计算时间过长导致性能低下 */public class Memoizer1
implements Computable
{ private final Map
cache = new HashMap
(); private final Computable
c; public Memoizer1(Computable
c) { this.c = c; } @Override public synchronized V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //计算 cache.put(key, result); } return result; }}
  • 通过并发容器ConcurrentHashMap代替HashMap,提升并发性能:
/** * 计算缓存器 * 通过ConcurrentHashMap代替HashMap, 提升并发性能 * 但这样有可能多个线程同时调用compute方法, * 由于计算过程中还没有结果,有可能导致多个线程计算同样的值 */public class Memoizer2
implements Computable
{ private final Map
cache = new ConcurrentHashMap
(); private final Computable
c; public Memoizer2(Computable
c) { this.c = c; } @Override public V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //计算 cache.put(key, result); } return result; }}
  • 通过FutureTask来弥补重复结果计算问题:
/** * 计算缓存器 * 通过FutureTask代替map中的Value * 这样可以在计算结果计算完成,就立即返回, * 但仍然有可能重复计算,因为存在非原子的复合操作"若没有则添加": if (f == null){...} */public class Memoizer3
implements Computable
{ private final Map
> cache = new ConcurrentHashMap
>(); private final Computable
c; public Memoizer3(Computable
c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { Future
f = cache.get(key); if (f == null){ Callable
computeTask = new Callable
() { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask
ft = new FutureTask<>(computeTask); f = ft; cache.put(key, ft); ft.run(); //执行计算 } try { return f.get(); //获取计算结果 } catch (ExecutionException e) { //do exception handle } return null; }}
  • 通过对CocurrentHashMap.putIfAbsent()对上面的问题进行修复:
/** * 计算缓存器 * 通过ConcurrentHashMap.putIfAbsent避免重复任务 */public class Memoizer
implements Computable
{ private final ConcurrentHashMap
> cache = new ConcurrentHashMap
>(); private final Computable
c; public Memoizer(Computable
c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { while(true){ Future
f = cache.get(key); if (f == null){ Callable
computeTask = new Callable
() { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask
ft = new FutureTask<>(computeTask); f = cache.putIfAbsent(key, ft); //该方法不会对相同key的值进行覆盖,这样避免了相同key的任务被计算 if (f == null) ft.run(); //执行计算 } try { return f.get(); //获取计算结果 } catch (CancellationException e){ cache.remove(key); //计算取消则移除对应的计算任务key } catch (ExecutionException e) { //do exception handle } } }}
一,二,三,四就讲述了java并发编程的基础知识。

并发技巧清单:

  • 可变状态至关重要的。
       所有并发访问都可以归结为如何协调对并发状态的访问,可变状态越少,越容易确保线程安全性。
  • 尽量将域声明为final类型,除非需要它们是可变的。
  • 不可变对象一定是线程安全的。

       不可变对象能极大地降低并发编程的复杂性。它们更为简单且安全,可以任意共享而无须使用加锁或保护性复制等机制。

  • 封装有助于管理复杂性。
      将数据封装在对象中,更易于维护不变性;将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁保护每个可变变量
  • 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
  • 在执行复合操作期间,要持有锁。
  • 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会可能出问题。
  • 不要自行推断不需要使用同步。
  • 设计过程中考虑线程安全,不要在上线出问题后再做。
  • 同步策略文档化

不吝指正。

转载于:https://my.oschina.net/indestiny/blog/212982

你可能感兴趣的文章
购物车练习
查看>>
js实现在表格中删除和添加一行
查看>>
SOCKET简单爬虫实现代码和使用方法
查看>>
跨域解决方案汇总
查看>>
In App Purchase
查看>>
js判断对象的类型的四种方式
查看>>
RPC框架的可靠性设计
查看>>
使用自选择创建团队
查看>>
基准测试(Benchmarks)不必消亡
查看>>
ceph 常用命令记录(完善中...)
查看>>
C# 7.3新特性一览
查看>>
用Chrome开发者工具调试一切
查看>>
简易mvvm库的设计实现
查看>>
AppDynamics把业务交易跟踪扩展到SAP环境
查看>>
[Three.js]Three.js中文文档-自定义混合方程常数
查看>>
Kafka 处理器客户端介绍
查看>>
通过分析这段代码的进化历程,或许能够加深您对JavaScript的作用域的理解
查看>>
创建对象(一):创建与继承
查看>>
深入浅出vue1.0:Vue 实例
查看>>
XML 实体扩展攻击
查看>>