构建自定义的同步工具
现有类库不能提供足够的功能时,可以使用内置的条件队列丶显示的Condition对象或者AbstractQueuedSynchronizer来构建自己的同步器
内置的条件队列
条件队列是指一组在等待某个条件变成真的线程,队列中元素是线程
条件队列是和锁相关联的,一个内置锁只能对应一个条件队列(有缺陷,显式锁可以对应多个),当需要获得内置锁时,并发的线程就会进入条件队列,Object的wait(),notify(),notifyAll()方法可以操作条件队列
- wait(): 让当前线程进入条件队列中等待
- notify(): 从条件队列的线程中随机唤醒一个线程,并让它去参与锁的竞争
- notifyAll(): 唤醒条件队列中所有等待的线程,让它们参与锁竞争
//内置的条件队列实现可阻塞的put与take方法
//阻塞并直到:not-full //可额外增加可中断策略
public synchronized void put(V v){
while(isFull()) //使用while当唤醒重新获取锁后再进行一次判断
wait();
boolean wasEmpty = isEmpty();
doPut(v); //插入操作
if(wasEmpty) notifyAll(); //唤醒
}
//阻塞并直到: not-empty
public synchronized V take(){
while(isEmpty)
wait();
boolean wasFull = isFull();
V v = doTake(); //获取操作
if(wasFull) notifyAll(); //唤醒
return v;
}
显式的Condition对象
内置条件队列存在一些缺陷,每个内置锁都只能有一个相关联的条件队列
一个显式锁可以对应多个Condition,一个Condition维护一个条件队列,这是一种更灵活的选择,对于公平的锁,线程会依照FIFO顺序从Condition.await中释放
//显式的条件队列实现可阻塞的put与take方法
class ConditionBoundedBuffer<T>{
protected final Lock lock = new ReentrantLock();
//条件
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
//阻塞并直到: notFull
public void put(T t) throws InterruptedException {
lock.lock();
try{
while(isFull())
notFull.await();
doPut(t);
notEmpty.signal();
} finally {
lock.unlock();
}
}
//阻塞并直到: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (isEmpty())
notEmpty.await();
T t = doTake();
notFull.signal();
return t;
} finally {
lock.unlock();
}
}
}
- 使用Lock实现信号量(Semaphore)
//使用Lock实现Semaphore(信号量) class SemaphoreOnLock{ private final Lock lock = new ReentrantLock(); private final Condition permitsAvailable = lock.newCondition(); private int permits; public SemaphoreOnLock(int permits) { lock.lock(); try{ this.permits = permits; } finally { lock.unlock(); } } //阻塞直到: permitsAvailable public void acquire() throws InterruptedException { lock.lock(); try{ while(permits <= 0) permitsAvailable.await(); --permits;//获取后许可-1 } finally { lock.unlock(); } } public void release(){ lock.lock(); try{ ++permits; //释放后许可+1 permitsAvailable.signal();//唤醒 } finally { lock.unlock(); } } }
AbstractQueuedSynchronizer(AQS)
AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效的构造出来,如: ReentrantLock丶Semaphore丶CountDownLatch丶ReentrantReadWriteLock丶SynchronousQueue和FutureTask
- AQS的域
private transient volatile Node head;//同步队列头结点 private transient volative Node tail; //同步队列尾节点 private volatile int state; //同步状态
- 域: 同步状态+同步队列
- 同步状态: 可以通过getState丶setState以及compareAndSetState等方法进行操作,可以用于表示任意状态,如ReentrantLock中表示重入获取锁的次数;Semaphore用它表示剩余的许可数量;FutureTask用它表示任务的状态
- 同步队列: 队列的节点(Node)中保存了获取同步状态失败的线程引用丶等待状态丶以及前驱和后继节点,是一个FIFO的双向队列
- AQS的方法
- 修改同步状态的方法
protected final int getState();//获取同步状态 protected final void setState(int newState);//设置同步状态 protected final boolean compareAndSetState(int expect,int update);//CAS设置同步状态
- 独占式获取和释放同步状态
public final void acquire(int arg) //独占式获取同步状态,如果不成功会进入同步队列等待 public final void acquireInterruptibly(int arg) //与acquire不同的是,能响应中断 public final boolean tryAcquireNanos(int arg, long nanosTimeout) //增加超时机制 public final boolean release(int arg) //独占式释放同步状态,该方法会调用重写的tryRelease(int arg)
- 共享式获取和释放同步状态
public final void acquireShared(int arg) //共享式获取同步状态,如果不成功会进入同步队列等待.与独占式不同的是,同一时刻可以有多个线程获取到同步状态 public final void acquireSharedInterruptibly(int arg) //可响应中断 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) //超时机制 public final boolean releaseShared(int arg) //共享式释放同步状态,该方法会调用重写的tryReleaseShared(int arg)
- 查询同步队列中的等待线程情况
public final Collection<Thread> getQueuedThreads(); //返回包含可能正在等待获取的线程列表,是无序估计值 public final boolean hasQueuedThreads();
- AQS提供的可重写方法(非final)
protected boolean tryAcquire(int arg) //独占式获取同步状态,此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它,true表示成功,false失败 protected boolean tryRelease(int arg) //独占式释放同步状态 protected int tryAcquireShared(int arg) //共享式获取同步状态,返回值语义:负数代表获取失败,0代表获取成功但没有剩余资源丶正数代表获取成功,还有剩余资源 protected boolean tryReleaseShared(int arg) //共享式释放同步状态 protected boolean isHeldExclusively() //AQS是否被当前线程所独占
- 修改同步状态的方法