EffectiveJava-避免过度同步

上周做关于Java并发的技术分享,我用了《Effective Java》中 避免过度同步 这一节的代码,之前看的时候没有彻底理解,现在重新分析一遍。这次就直接放书中代码吧:

ForwardingSet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ForwardingSet<E> implements Set<E>{  

private final Set<E> s;

public ForwardingSet(Set<E> s){
this.s = s;
}

@Override
public int size() {return this.s.size();}

@Override
public void clear() {this.s.clear();}

@Override
public boolean isEmpty() {return this.s.isEmpty();}

@Override
public boolean contains(Object o) {return this.s.contains(o);}

@Override
public Iterator<E> iterator() {return this.s.iterator();}

@Override
public Object[] toArray() {return this.s.toArray();}

@Override
public <T> T[] toArray(T[] a) {return this.s.toArray(a);}

@Override
public boolean add(E e) {return this.s.add(e);}

@Override
public boolean remove(Object o) {return this.s.remove(o);}

@Override
public boolean containsAll(Collection<?> c) {return this.s.containsAll(c);}

@Override
public boolean addAll(Collection<? extends E> c) {return this.s.addAll(c);}

@Override
public boolean retainAll(Collection<?> c) {return this.s.removeAll(c);}


@Override
public boolean removeAll(Collection<?> c) {return this.s.retainAll(c);}
}
ObservableSet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ObservableSet<E> extends ForwardingSet<E>{  

public ObservableSet(Set<E> s) {
super(s);
}


private final List<SetObserver<E>> observers =
new ArrayList<SetObserver<E>>();

public void addObserver(SetObserver<E> observer){
synchronized (observers) {
observers.add(observer);
}
}

public boolean removeObserver(SetObserver<E> observer){
synchronized (observers) {
return observers.remove(observer);
}
}

private void notifyElementAdded(E element){
synchronized (observers) {
for(SetObserver<E> observer : observers){
observer.added(this, element);
}
}
}

@Override
public boolean add(E element) {
boolean added = super.add(element);
if (added) {
notifyElementAdded(element);
}
return added;
}

@Override
public boolean addAll(Collection<? extends E> c) {
boolean result = false;
for(E element : c){
result |= add(element);
}
return result;
}

}

ObservableSet中通过addObserver订阅通知,removeObserver取消订阅。

SetObserver.java
1
2
3
public interface SetObserver<E> {  
void added(ObservableSet<E> set, E element);
}
Main.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args) {  
ObservableSet<Integer> set = new ObservableSet<Integer>(new HashSet<Integer>());
set.addObserver(new SetObserver<Integer>() {

@Override
public void added(ObservableSet<Integer> set, Integer element) {

System.out.print(element+" ");
if (element == 23) {
ExecutorService executorService = Executors.newSingleThreadExecutor();


final SetObserver<Integer> observer = this;

// 使用submit(Runnable).get()方法
try {
executorService.submit(new Runnable() {
@Override
public void run() {
set.removeObserver(observer);
}
}).get();
} catch (ExecutionException ex) {
throw new AssertionError(ex.getCause());
}catch (InterruptedException ex) {
throw new AssertionError(ex.getCause());
}finally{
executorService.shutdown();
}

// 使用execute方法
// try {
// executorService.execute(new Runnable() {
// @Override
// public void run() {
// set.removeObserver(observer);
// }
// });
// } catch (AssertionError ex) {
// throw new AssertionError(ex.getCause());
// } finally {
// executorService.shutdown();
// }
}
}
});

for(int i = 0; i < 100; i++){
set.add(i);
}
}

在main方法里给ObservableSet的一个实例循环添加0~99,不对这个实例进行其他操作时正常打印0~99。然后在循环中间remove掉观察者的话有两种情况:

  • 在当前线程中直接调用removeObserver(),这时候会报ConcurrentModificationException的异常,书中原文:

问题在于,当notifyElementAdded调用观察者的added方法时,他正处于遍历observers列表的过程中。added方法调用可观察集合的removeObserver方法,从而调用observers.remove方法。现在我们有麻烦了。我们正企图在遍历列表的过程中,将一个元素从列表中删除,这是非法的,notifyElementAdded方法中的迭代式在一个同步块中,可以防止并发修改,但是无法防止迭代线程本身回调到可观察的集合中,也无法防止修改它的observers列表。

  • 用Java提供的ExecutorService在子线程调用removeObserver。书中使用的是submit(Runnable).get()方法,在这种情况下程序出现死锁,子线程无法删除观察者,主线程也无法继续执行循环。书中原文:

后台线程调用set.removeObserver,他企图锁定observers,但他无法获得该锁,因为主线程已经没有锁了。在这期间,主线程一直在等待后台程序来完成对观察者的删除,这正是造成死锁的原因。

截止目前都用的书中的代码,当时我偷懒用了一个平时常用的execute方法,然后就不发生死锁了,循环执行到目标数字后的某个数字都会成功remove掉观察者,循环完毕程序正常退出。而如果使用submit方法,只有在submit(Runnable).get()的情况下才会死锁,其他情形下都跟execute一样。

看Java的文档可以知道一般使用execute和submit的区别是submit会返回一个Future,相当于当前任务的返回值,然后《Think in Java》中关于ExecutorService有一段:

submit()方法会产生Future对象,它用Callable返回结果的特定类型进行了参数化,你可以用isDone()方法来查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get(),在这种情况下get()将阻塞,直至结果准备就绪。你还可以在试图调用get()来获取结果之前,先调用具有超时的get(),或者调用isDone来查看任务是否完成。

如果这里使用的是get()的超时方法,那么在超时之前可以看到isDone为false,超时之后catch一个TimeoutException,然后跟其余情况一样观察者正常remove。

所以这里使用submit(Runnable).get()的目的是模拟一个后台线程获取主线程上的锁,获取不到就直接阻塞,最终造成死锁的情况。(而不只是由于在其他线程调用外部方法导致死锁)

关于周二的错误只先弄清楚了使用ExecutorService相关的。由于基本没接触过Java并发,关于锁和阻塞这些还是没搞清楚,还得继续看-_-!。