上周做关于Java并发的技术分享,我用了《Effective Java》中 避免过度同步 这一节的代码,之前看的时候没有彻底理解,现在重新分析一遍。这次就直接放书中代码吧:
ForwardingSet.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class ForwardingSet<E> implements Set<E>{
private final Set<E> s;
public ForwardingSet(Set<E> s){ this.s = s; }
@Override public int size() {return this.s.size();}
@Override public void clear() {this.s.clear();}
@Override public boolean isEmpty() {return this.s.isEmpty();}
@Override public boolean contains(Object o) {return this.s.contains(o);}
@Override public Iterator<E> iterator() {return this.s.iterator();}
@Override public Object[] toArray() {return this.s.toArray();}
@Override public <T> T[] toArray(T[] a) {return this.s.toArray(a);}
@Override public boolean add(E e) {return this.s.add(e);}
@Override public boolean remove(Object o) {return this.s.remove(o);}
@Override public boolean containsAll(Collection<?> c) {return this.s.containsAll(c);}
@Override public boolean addAll(Collection<? extends E> c) {return this.s.addAll(c);}
@Override public boolean retainAll(Collection<?> c) {return this.s.removeAll(c);}
@Override public boolean removeAll(Collection<?> c) {return this.s.retainAll(c);} }
|
ObservableSet.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class ObservableSet<E> extends ForwardingSet<E>{
public ObservableSet(Set<E> s) { super(s); }
private final List<SetObserver<E>> observers = new ArrayList<SetObserver<E>>();
public void addObserver(SetObserver<E> observer){ synchronized (observers) { observers.add(observer); } }
public boolean removeObserver(SetObserver<E> observer){ synchronized (observers) { return observers.remove(observer); } }
private void notifyElementAdded(E element){ synchronized (observers) { for(SetObserver<E> observer : observers){ observer.added(this, element); } } }
@Override public boolean add(E element) { boolean added = super.add(element); if (added) { notifyElementAdded(element); } return added; }
@Override public boolean addAll(Collection<? extends E> c) { boolean result = false; for(E element : c){ result |= add(element); } return result; }
}
|
ObservableSet中通过addObserver订阅通知,removeObserver取消订阅。
SetObserver.java
1 2 3
| public interface SetObserver<E> { void added(ObservableSet<E> set, E element); }
|
Main.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public static void main(String[] args) { ObservableSet<Integer> set = new ObservableSet<Integer>(new HashSet<Integer>()); set.addObserver(new SetObserver<Integer>() {
@Override public void added(ObservableSet<Integer> set, Integer element) {
System.out.print(element+" "); if (element == 23) { ExecutorService executorService = Executors.newSingleThreadExecutor();
final SetObserver<Integer> observer = this;
try { executorService.submit(new Runnable() { @Override public void run() { set.removeObserver(observer); } }).get(); } catch (ExecutionException ex) { throw new AssertionError(ex.getCause()); }catch (InterruptedException ex) { throw new AssertionError(ex.getCause()); }finally{ executorService.shutdown(); }
} } });
for(int i = 0; i < 100; i++){ set.add(i); } }
|
在main方法里给ObservableSet的一个实例循环添加0~99,不对这个实例进行其他操作时正常打印0~99。然后在循环中间remove掉观察者的话有两种情况:
- 在当前线程中直接调用removeObserver(),这时候会报ConcurrentModificationException的异常,书中原文:
问题在于,当notifyElementAdded调用观察者的added方法时,他正处于遍历observers列表的过程中。added方法调用可观察集合的removeObserver方法,从而调用observers.remove方法。现在我们有麻烦了。我们正企图在遍历列表的过程中,将一个元素从列表中删除,这是非法的,notifyElementAdded方法中的迭代式在一个同步块中,可以防止并发修改,但是无法防止迭代线程本身回调到可观察的集合中,也无法防止修改它的observers列表。
- 用Java提供的ExecutorService在子线程调用removeObserver。书中使用的是submit(Runnable).get()方法,在这种情况下程序出现死锁,子线程无法删除观察者,主线程也无法继续执行循环。书中原文:
后台线程调用set.removeObserver,他企图锁定observers,但他无法获得该锁,因为主线程已经没有锁了。在这期间,主线程一直在等待后台程序来完成对观察者的删除,这正是造成死锁的原因。
截止目前都用的书中的代码,当时我偷懒用了一个平时常用的execute方法,然后就不发生死锁了,循环执行到目标数字后的某个数字都会成功remove掉观察者,循环完毕程序正常退出。而如果使用submit方法,只有在submit(Runnable).get()的情况下才会死锁,其他情形下都跟execute一样。
看Java的文档可以知道一般使用execute和submit的区别是submit会返回一个Future,相当于当前任务的返回值,然后《Think in Java》中关于ExecutorService有一段:
submit()方法会产生Future对象,它用Callable返回结果的特定类型进行了参数化,你可以用isDone()方法来查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get(),在这种情况下get()将阻塞,直至结果准备就绪。你还可以在试图调用get()来获取结果之前,先调用具有超时的get(),或者调用isDone来查看任务是否完成。
如果这里使用的是get()的超时方法,那么在超时之前可以看到isDone为false,超时之后catch一个TimeoutException,然后跟其余情况一样观察者正常remove。
所以这里使用submit(Runnable).get()的目的是模拟一个后台线程获取主线程上的锁,获取不到就直接阻塞,最终造成死锁的情况。(而不只是由于在其他线程调用外部方法导致死锁)
关于周二的错误只先弄清楚了使用ExecutorService相关的。由于基本没接触过Java并发,关于锁和阻塞这些还是没搞清楚,还得继续看-_-!。